package elephantdb.hadoop;

import elephantdb.DomainSpec;
import elephantdb.Utils;
import elephantdb.document.KeyValDocument;
import elephantdb.persistence.Persistence;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;

/* loaded from: input_file:elephantdb/hadoop/ElephantOutputFormat.class */
public class ElephantOutputFormat implements OutputFormat<IntWritable, ElephantRecordWritable> {
    public static Logger LOG = Logger.getLogger(ElephantOutputFormat.class);
    public static final String ARGS_CONF = "elephant.output.args";

    /* loaded from: input_file:elephantdb/hadoop/ElephantOutputFormat$Args.class */
    public static class Args implements Serializable {
        public DomainSpec spec;
        public String outputDirHdfs;

        public Args(DomainSpec domainSpec, String str) {
            this.spec = domainSpec;
            this.outputDirHdfs = str;
        }
    }

    /* loaded from: input_file:elephantdb/hadoop/ElephantOutputFormat$ElephantRecordWriter.class */
    public class ElephantRecordWriter implements RecordWriter<IntWritable, ElephantRecordWritable>, Closeable {
        FileSystem fileSystem;
        Args args;
        Progressable progressable;
        LocalElephantManager localManager;
        Map<Integer, Persistence> lps = new HashMap();
        int numWritten = 0;
        long lastCheckpoint = System.currentTimeMillis();

        public ElephantRecordWriter(Configuration configuration, Args args, Progressable progressable) throws IOException {
            this.fileSystem = Utils.getFS(args.outputDirHdfs, configuration);
            this.args = args;
            this.progressable = progressable;
            this.localManager = new LocalElephantManager(this.fileSystem, args.spec, LocalElephantManager.getTmpDirs(configuration));
        }

        private Persistence retrieveShard(int i) throws IOException {
            Persistence openPersistenceForAppend;
            if (this.lps.containsKey(Integer.valueOf(i))) {
                openPersistenceForAppend = this.lps.get(Integer.valueOf(i));
            } else {
                openPersistenceForAppend = this.args.spec.getCoordinator().openPersistenceForAppend(this.localManager.downloadRemoteShard("" + i, null), this.args.spec.getPersistenceOptions());
                this.lps.put(Integer.valueOf(i), openPersistenceForAppend);
                progress();
            }
            return openPersistenceForAppend;
        }

        public void write(IntWritable intWritable, ElephantRecordWritable elephantRecordWritable) throws IOException {
            retrieveShard(intWritable.get()).index(new KeyValDocument(elephantRecordWritable.key, elephantRecordWritable.value));
            bumpProgress();
        }

        public void bumpProgress() {
            this.numWritten++;
            if (this.numWritten % 25000 == 0) {
                long currentTimeMillis = System.currentTimeMillis();
                long j = currentTimeMillis - this.lastCheckpoint;
                this.lastCheckpoint = currentTimeMillis;
                ElephantOutputFormat.LOG.info("Wrote last 25000 records in " + j + " ms");
                this.localManager.progress();
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            close(null);
        }

        public void close(Reporter reporter) throws IOException {
            for (Integer num : this.lps.keySet()) {
                String localTmpDir = this.localManager.localTmpDir("" + num);
                ElephantOutputFormat.LOG.info("Closing LP for shard " + num + " at " + localTmpDir);
                this.lps.get(num).close();
                ElephantOutputFormat.LOG.info("Closed LP for shard " + num + " at " + localTmpDir);
                progress();
                String str = this.args.outputDirHdfs + "/" + num;
                int i = 4;
                while (this.fileSystem.exists(new Path(str)) && i > 0) {
                    ElephantOutputFormat.LOG.info("Deleting existing shard " + num + " at " + str);
                    this.fileSystem.delete(new Path(str), true);
                    i--;
                }
                if (this.fileSystem.exists(new Path(str)) && i == 0) {
                    throw new IOException("Failed to delete shard " + num + " at " + str + " after " + i + " attempts!");
                }
                ElephantOutputFormat.LOG.info("Deleted existing shard " + num + " at " + str);
                ElephantOutputFormat.LOG.info("Copying " + localTmpDir + " to " + str);
                this.fileSystem.copyFromLocalFile(new Path(localTmpDir), new Path(str));
                ElephantOutputFormat.LOG.info("Copied " + localTmpDir + " to " + str);
                progress();
            }
            this.localManager.cleanup();
        }

        private void progress() {
            if (this.progressable != null) {
                this.progressable.progress();
            }
        }
    }

    public RecordWriter<IntWritable, ElephantRecordWritable> getRecordWriter(FileSystem fileSystem, JobConf jobConf, String str, Progressable progressable) throws IOException {
        return new ElephantRecordWriter(jobConf, (Args) Utils.getObject(jobConf, ARGS_CONF), progressable);
    }

    public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) throws IOException {
        Args args = (Args) Utils.getObject(jobConf, ARGS_CONF);
        FileSystem fs = Utils.getFS(args.outputDirHdfs, jobConf);
        if (jobConf.getBoolean("mapred.reduce.tasks.speculative.execution", true)) {
            throw new InvalidJobConfException("Speculative execution should be false");
        }
        if (fs.exists(new Path(args.outputDirHdfs))) {
            throw new InvalidJobConfException("Output dir already exists " + args.outputDirHdfs);
        }
    }
}
