package clj_headlights;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.UUID;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:clj_headlights/PartitionedFileSink.class */
public class PartitionedFileSink extends PTransform<PCollection<KV<String, Iterable<String>>>, PCollection<String>> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionedFileSink.class);
    public static final String TEMPORARY_FILENAME_SEPARATOR = "/temp/";
    protected final String outputPath;
    protected final String filename;

    /* loaded from: input_file:clj_headlights/PartitionedFileSink$BundleTempFileWriter.class */
    public static class BundleTempFileWriter extends DoFn<KV<String, Iterable<String>>, KV<String, String>> {
        protected final String baseTemporaryFilename;
        protected String bundleId;

        public BundleTempFileWriter(String str) {
            this.baseTemporaryFilename = str;
        }

        protected static final String buildTemporaryFilename(String str, String str2, String str3) {
            return str + str3 + PartitionedFileSink.TEMPORARY_FILENAME_SEPARATOR + str2;
        }

        @DoFn.StartBundle
        public void startBundle(DoFn<KV<String, Iterable<String>>, KV<String, String>>.StartBundleContext startBundleContext) throws IOException {
            this.bundleId = UUID.randomUUID().toString();
        }

        private static ByteBuffer wrap(String str) throws Exception {
            return ByteBuffer.wrap((str + "\n").getBytes("UTF-8"));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, Iterable<String>>, KV<String, String>>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
            KV kv = (KV) processContext.element();
            String str = (String) kv.getKey();
            ResourceId matchNewResource = FileSystems.matchNewResource(buildTemporaryFilename(this.baseTemporaryFilename, this.bundleId, str), false);
            WritableByteChannel create = FileSystems.create(matchNewResource, "text/plain");
            Iterator it = ((Iterable) kv.getValue()).iterator();
            while (it.hasNext()) {
                create.write(wrap((String) it.next()));
            }
            create.close();
            processContext.output(KV.of(str, matchNewResource.toString()));
        }
    }

    /* loaded from: input_file:clj_headlights/PartitionedFileSink$SuccessfulBundleCopier.class */
    public static class SuccessfulBundleCopier extends DoFn<KV<String, String>, String> {
        private String outputPath;
        private String filename;

        public SuccessfulBundleCopier(String str, String str2) {
            this.outputPath = str;
            this.filename = str2;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext) throws IOException {
            String str = (String) ((KV) processContext.element()).getKey();
            try {
                ResourceId resourceId = FileSystems.matchSingleFileSpec((String) ((KV) processContext.element()).getValue()).resourceId();
                ResourceId matchNewResource = FileSystems.matchNewResource(this.outputPath + str + "/" + this.filename, false);
                PartitionedFileSink.LOG.info("Moving " + resourceId.toString() + " to " + matchNewResource.toString());
                FileSystems.rename(Arrays.asList(resourceId), Arrays.asList(matchNewResource), new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
                FileSystems.delete(Arrays.asList(FileSystems.matchSingleFileSpec(this.outputPath + str + PartitionedFileSink.TEMPORARY_FILENAME_SEPARATOR).resourceId()), new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
            } catch (FileNotFoundException e) {
            }
        }
    }

    public PartitionedFileSink(String str, String str2) {
        if (!str.matches("^/.+|^gs://.+")) {
            LOG.error("outputPath must be an absolute path");
        }
        LOG.info("Writing to: " + str + "/{{ key }}/" + str2);
        this.outputPath = str + (str.endsWith("/") ? "" : "/");
        this.filename = str2;
    }

    public PCollection<String> expand(PCollection<KV<String, Iterable<String>>> pCollection) {
        return pCollection.apply(ParDo.of(new BundleTempFileWriter(this.outputPath))).apply(ParDo.of(new SuccessfulBundleCopier(this.outputPath, this.filename)));
    }
}
