package storm.kafka.trident;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import storm.kafka.KafkaConfig;
import storm.kafka.StaticPartitionConnections;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.topology.TransactionAttempt;

/* loaded from: input_file:storm/kafka/trident/TransactionalTridentKafkaSpout.class */
public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<Map> {
    TridentKafkaConfig _config;
    String _topologyInstanceId = UUID.randomUUID().toString();

    /* loaded from: input_file:storm/kafka/trident/TransactionalTridentKafkaSpout$Coordinator.class */
    class Coordinator implements IPartitionedTridentSpout.Coordinator {
        Coordinator() {
        }

        public long numPartitions() {
            return TransactionalTridentKafkaSpout.this.computeNumPartitions();
        }

        public void close() {
            TransactionalTridentKafkaSpout.this._config.coordinator.close();
        }

        public boolean isReady(long j) {
            return TransactionalTridentKafkaSpout.this._config.coordinator.isReady(j);
        }
    }

    /* loaded from: input_file:storm/kafka/trident/TransactionalTridentKafkaSpout$Emitter.class */
    class Emitter implements IPartitionedTridentSpout.Emitter<Map> {
        StaticPartitionConnections _connections;
        int partitionsPerHost;

        public Emitter() {
            this._connections = new StaticPartitionConnections(TransactionalTridentKafkaSpout.this._config);
            this.partitionsPerHost = ((KafkaConfig.StaticHosts) TransactionalTridentKafkaSpout.this._config.hosts).partitionsPerHost;
        }

        public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, int i, Map map) {
            return KafkaUtils.emitPartitionBatchNew(TransactionalTridentKafkaSpout.this._config, i, this._connections.getConsumer(i), transactionAttempt, tridentCollector, map, TransactionalTridentKafkaSpout.this._topologyInstanceId);
        }

        public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, int i, Map map) {
            String str = (String) map.get("instanceId");
            if (!TransactionalTridentKafkaSpout.this._config.forceFromStart || str.equals(TransactionalTridentKafkaSpout.this._topologyInstanceId)) {
                SimpleConsumer consumer = this._connections.getConsumer(i);
                long longValue = ((Long) map.get("offset")).longValue();
                long longValue2 = ((Long) map.get("nextOffset")).longValue();
                Iterator it = consumer.fetch(new FetchRequest(TransactionalTridentKafkaSpout.this._config.topic, i % this.partitionsPerHost, longValue, TransactionalTridentKafkaSpout.this._config.fetchSizeBytes)).iterator();
                while (it.hasNext()) {
                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                    if (longValue == longValue2) {
                        return;
                    }
                    if (longValue > longValue2) {
                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
                    }
                    KafkaUtils.emit(TransactionalTridentKafkaSpout.this._config, transactionAttempt, tridentCollector, messageAndOffset.message());
                    longValue = messageAndOffset.offset();
                }
            }
        }

        public void close() {
            this._connections.close();
        }
    }

    public TransactionalTridentKafkaSpout(TridentKafkaConfig tridentKafkaConfig) {
        this._config = tridentKafkaConfig;
    }

    public IPartitionedTridentSpout.Coordinator getCoordinator(Map map, TopologyContext topologyContext) {
        return new Coordinator();
    }

    public IPartitionedTridentSpout.Emitter getEmitter(Map map, TopologyContext topologyContext) {
        return new Emitter();
    }

    public Fields getOutputFields() {
        return this._config.scheme.getOutputFields();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int computeNumPartitions() {
        KafkaConfig.StaticHosts staticHosts = (KafkaConfig.StaticHosts) this._config.hosts;
        return staticHosts.hosts.size() * staticHosts.partitionsPerHost;
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
