package elephantdb.cascading;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import elephantdb.DomainSpec;
import elephantdb.Utils;
import elephantdb.hadoop.ElephantInputFormat;
import elephantdb.hadoop.ElephantOutputFormat;
import elephantdb.serialize.Serializer;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;

/* loaded from: input_file:elephantdb/cascading/ElephantScheme.class */
public class ElephantScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
    Serializer serializer;
    Gateway gateway;

    public ElephantScheme(Fields fields, Fields fields2, DomainSpec domainSpec, Gateway gateway) {
        setSourceFields(fields);
        setSinkFields(fields2);
        this.serializer = Utils.makeSerializer(domainSpec);
        this.gateway = gateway;
    }

    public Serializer getSerializer() {
        return this.serializer;
    }

    public void sourceConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
        jobConf.setInputFormat(ElephantInputFormat.class);
    }

    public void sinkConfInit(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
        jobConf.setOutputKeyClass(IntWritable.class);
        jobConf.setOutputValueClass(BytesWritable.class);
        jobConf.setOutputFormat(ElephantOutputFormat.class);
    }

    public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
        sourceCall.setContext(new Object[2]);
        ((Object[]) sourceCall.getContext())[0] = ((RecordReader) sourceCall.getInput()).createKey();
        ((Object[]) sourceCall.getContext())[1] = ((RecordReader) sourceCall.getInput()).createValue();
    }

    public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
        NullWritable nullWritable = (NullWritable) ((Object[]) sourceCall.getContext())[0];
        BytesWritable bytesWritable = (BytesWritable) ((Object[]) sourceCall.getContext())[1];
        if (!((RecordReader) sourceCall.getInput()).next(nullWritable, bytesWritable)) {
            return false;
        }
        sourceCall.getIncomingEntry().setTuple(this.gateway.toTuple(getSerializer().deserialize(Utils.getBytes(bytesWritable))));
        return true;
    }

    public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
        Tuple tuple = sinkCall.getOutgoingEntry().getTuple();
        ((OutputCollector) sinkCall.getOutput()).collect(new IntWritable(tuple.getInteger(0)), new BytesWritable(getSerializer().serialize(this.gateway.fromTuple(tuple))));
    }

    public /* bridge */ /* synthetic */ void sinkConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
        sinkConfInit((FlowProcess<JobConf>) flowProcess, (Tap<JobConf, RecordReader, OutputCollector>) tap, (JobConf) obj);
    }

    public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Tap tap, Object obj) {
        sourceConfInit((FlowProcess<JobConf>) flowProcess, (Tap<JobConf, RecordReader, OutputCollector>) tap, (JobConf) obj);
    }
}
