package storm.kafka;

import backtype.storm.Config;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BasePartitionedTransactionalSpout;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import kafka.api.FetchRequest;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import storm.kafka.KafkaConfig;

/* loaded from: input_file:storm/kafka/TransactionalKafkaSpout.class */
public class TransactionalKafkaSpout extends BasePartitionedTransactionalSpout<BatchMeta> {
    public static final String ATTEMPT_FIELD = TransactionalKafkaSpout.class.getCanonicalName() + "/attempt";
    KafkaConfig _config;

    /* loaded from: input_file:storm/kafka/TransactionalKafkaSpout$Coordinator.class */
    class Coordinator implements IPartitionedTransactionalSpout.Coordinator {
        Coordinator() {
        }

        public int numPartitions() {
            return TransactionalKafkaSpout.this.computeNumPartitions();
        }

        public void close() {
        }

        public boolean isReady() {
            return true;
        }
    }

    /* loaded from: input_file:storm/kafka/TransactionalKafkaSpout$Emitter.class */
    class Emitter implements IPartitionedTransactionalSpout.Emitter<BatchMeta> {
        StaticPartitionConnections _connections;
        int partitionsPerHost;

        public Emitter() {
            this._connections = new StaticPartitionConnections(TransactionalKafkaSpout.this._config);
            this.partitionsPerHost = ((KafkaConfig.StaticHosts) TransactionalKafkaSpout.this._config.hosts).partitionsPerHost;
        }

        public BatchMeta emitPartitionBatchNew(TransactionAttempt transactionAttempt, BatchOutputCollector batchOutputCollector, int i, BatchMeta batchMeta) {
            return KafkaUtils.emitPartitionBatchNew(TransactionalKafkaSpout.this._config, i, this._connections.getConsumer(i), transactionAttempt, batchOutputCollector, batchMeta);
        }

        public void emitPartitionBatch(TransactionAttempt transactionAttempt, BatchOutputCollector batchOutputCollector, int i, BatchMeta batchMeta) {
            ByteBufferMessageSet fetch = this._connections.getConsumer(i).fetch(new FetchRequest(TransactionalKafkaSpout.this._config.topic, i % this.partitionsPerHost, batchMeta.offset, TransactionalKafkaSpout.this._config.fetchSizeBytes));
            long j = batchMeta.offset;
            Iterator it = fetch.iterator();
            while (it.hasNext()) {
                MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                if (j == batchMeta.nextOffset) {
                    return;
                }
                if (j > batchMeta.nextOffset) {
                    throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
                }
                KafkaUtils.emit(TransactionalKafkaSpout.this._config, transactionAttempt, batchOutputCollector, messageAndOffset.message());
                j = messageAndOffset.offset();
            }
        }

        public void close() {
            this._connections.close();
        }
    }

    public TransactionalKafkaSpout(KafkaConfig kafkaConfig) {
        this._config = kafkaConfig;
    }

    public IPartitionedTransactionalSpout.Coordinator getCoordinator(Map map, TopologyContext topologyContext) {
        return new Coordinator();
    }

    public IPartitionedTransactionalSpout.Emitter getEmitter(Map map, TopologyContext topologyContext) {
        return new Emitter();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        ArrayList arrayList = new ArrayList(this._config.scheme.getOutputFields().toList());
        arrayList.add(0, ATTEMPT_FIELD);
        outputFieldsDeclarer.declare(new Fields(arrayList));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int computeNumPartitions() {
        KafkaConfig.StaticHosts staticHosts = (KafkaConfig.StaticHosts) this._config.hosts;
        return staticHosts.hosts.size() * staticHosts.partitionsPerHost;
    }

    public Map<String, Object> getComponentConfiguration() {
        Config config = new Config();
        config.registerSerialization(BatchMeta.class);
        return config;
    }
}
