package storm.kafka.trident;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import storm.kafka.DynamicPartitionConnections;
import storm.kafka.GlobalPartitionId;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.topology.TransactionAttempt;

/* loaded from: input_file:storm/kafka/trident/TransactionalTridentKafkaSpout.class */
public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<Map<String, List>, GlobalPartitionId, Map> {
    TridentKafkaConfig _config;
    String _topologyInstanceId = UUID.randomUUID().toString();

    /* loaded from: input_file:storm/kafka/trident/TransactionalTridentKafkaSpout$Coordinator.class */
    class Coordinator implements IPartitionedTridentSpout.Coordinator<Map> {
        IBrokerReader reader;

        public Coordinator(Map map) {
            this.reader = KafkaUtils.makeBrokerReader(map, TransactionalTridentKafkaSpout.this._config);
        }

        public void close() {
            TransactionalTridentKafkaSpout.this._config.coordinator.close();
        }

        public boolean isReady(long j) {
            return TransactionalTridentKafkaSpout.this._config.coordinator.isReady(j);
        }

        /* renamed from: getPartitionsForBatch, reason: merged with bridge method [inline-methods] */
        public Map m7getPartitionsForBatch() {
            return this.reader.getCurrentBrokers();
        }
    }

    /* loaded from: input_file:storm/kafka/trident/TransactionalTridentKafkaSpout$Emitter.class */
    class Emitter implements IPartitionedTridentSpout.Emitter<Map<String, List>, GlobalPartitionId, Map> {
        DynamicPartitionConnections _connections;
        String _topologyName;

        public Emitter(Map map) {
            this._connections = new DynamicPartitionConnections(TransactionalTridentKafkaSpout.this._config);
            this._topologyName = (String) map.get(Config.TOPOLOGY_NAME);
        }

        public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, GlobalPartitionId globalPartitionId, Map map) {
            return KafkaUtils.emitPartitionBatchNew(TransactionalTridentKafkaSpout.this._config, this._connections.register(globalPartitionId), globalPartitionId, tridentCollector, map, TransactionalTridentKafkaSpout.this._topologyInstanceId, this._topologyName);
        }

        public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, GlobalPartitionId globalPartitionId, Map map) {
            String str = (String) map.get("instanceId");
            if (!TransactionalTridentKafkaSpout.this._config.forceFromStart || str.equals(TransactionalTridentKafkaSpout.this._topologyInstanceId)) {
                SimpleConsumer register = this._connections.register(globalPartitionId);
                long longValue = ((Long) map.get("offset")).longValue();
                long longValue2 = ((Long) map.get("nextOffset")).longValue();
                Iterator it = register.fetch(new FetchRequest(TransactionalTridentKafkaSpout.this._config.topic, globalPartitionId.partition, longValue, TransactionalTridentKafkaSpout.this._config.fetchSizeBytes)).iterator();
                while (it.hasNext()) {
                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                    if (longValue == longValue2) {
                        return;
                    }
                    if (longValue > longValue2) {
                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
                    }
                    KafkaUtils.emit(TransactionalTridentKafkaSpout.this._config, tridentCollector, messageAndOffset.message());
                    longValue = messageAndOffset.offset();
                }
            }
        }

        public void close() {
            this._connections.clear();
        }

        public List<GlobalPartitionId> getOrderedPartitions(Map<String, List> map) {
            return KafkaUtils.getOrderedPartitions(map);
        }

        public void refreshPartitions(List<GlobalPartitionId> list) {
            this._connections.clear();
        }
    }

    public TransactionalTridentKafkaSpout(TridentKafkaConfig tridentKafkaConfig) {
        this._config = tridentKafkaConfig;
    }

    public IPartitionedTridentSpout.Coordinator getCoordinator(Map map, TopologyContext topologyContext) {
        return new Coordinator(map);
    }

    public IPartitionedTridentSpout.Emitter getEmitter(Map map, TopologyContext topologyContext) {
        return new Emitter(map);
    }

    public Fields getOutputFields() {
        return this._config.scheme.getOutputFields();
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
