package storm.kafka;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import kafka.message.Message;
import org.apache.log4j.Logger;
import storm.kafka.KafkaConfig;
import storm.kafka.PartitionManager;

/* loaded from: input_file:storm/kafka/KafkaSpout.class */
public class KafkaSpout extends BaseRichSpout {
    public static final Logger LOG = Logger.getLogger(KafkaSpout.class);
    SpoutConfig _spoutConfig;
    SpoutOutputCollector _collector;
    PartitionCoordinator _coordinator;
    DynamicPartitionConnections _connections;
    ZkState _state;
    String _uuid = UUID.randomUUID().toString();
    long _lastUpdateMs = 0;
    int _currPartitionIndex = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:storm/kafka/KafkaSpout$EmitState.class */
    public enum EmitState {
        EMITTED_MORE_LEFT,
        EMITTED_END,
        NO_EMITTED
    }

    /* loaded from: input_file:storm/kafka/KafkaSpout$MessageAndRealOffset.class */
    public static class MessageAndRealOffset {
        public Message msg;
        public long offset;

        public MessageAndRealOffset(Message message, long j) {
            this.msg = message;
            this.offset = j;
        }
    }

    public KafkaSpout(SpoutConfig spoutConfig) {
        this._spoutConfig = spoutConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        HashMap hashMap = new HashMap(map);
        List<String> list = this._spoutConfig.zkServers;
        if (list == null) {
            list = (List) map.get(Config.STORM_ZOOKEEPER_SERVERS);
        }
        Integer num = this._spoutConfig.zkPort;
        if (num == null) {
            num = Integer.valueOf(((Number) map.get(Config.STORM_ZOOKEEPER_PORT)).intValue());
        }
        hashMap.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, list);
        hashMap.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, num);
        hashMap.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, this._spoutConfig.zkRoot);
        this._state = new ZkState(hashMap);
        this._connections = new DynamicPartitionConnections(this._spoutConfig);
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        if (this._spoutConfig.hosts instanceof KafkaConfig.StaticHosts) {
            this._coordinator = new StaticCoordinator(this._connections, map, this._spoutConfig, this._state, topologyContext.getThisTaskIndex(), size, this._uuid);
        } else {
            this._coordinator = new ZkCoordinator(this._connections, map, this._spoutConfig, this._state, topologyContext.getThisTaskIndex(), size, this._uuid);
        }
    }

    public void close() {
        this._state.close();
    }

    public void nextTuple() {
        List<PartitionManager> myManagedPartitions = this._coordinator.getMyManagedPartitions();
        for (int i = 0; i < myManagedPartitions.size(); i++) {
            this._currPartitionIndex %= myManagedPartitions.size();
            EmitState next = myManagedPartitions.get(this._currPartitionIndex).next(this._collector);
            if (next != EmitState.EMITTED_MORE_LEFT) {
                this._currPartitionIndex = (this._currPartitionIndex + 1) % myManagedPartitions.size();
            }
            if (next != EmitState.NO_EMITTED) {
                break;
            }
        }
        if (System.currentTimeMillis() - this._lastUpdateMs > this._spoutConfig.stateUpdateIntervalMs) {
            commit();
        }
    }

    public void ack(Object obj) {
        PartitionManager.KafkaMessageId kafkaMessageId = (PartitionManager.KafkaMessageId) obj;
        PartitionManager manager = this._coordinator.getManager(kafkaMessageId.partition);
        if (manager != null) {
            manager.ack(Long.valueOf(kafkaMessageId.offset));
        }
    }

    public void fail(Object obj) {
        PartitionManager.KafkaMessageId kafkaMessageId = (PartitionManager.KafkaMessageId) obj;
        PartitionManager manager = this._coordinator.getManager(kafkaMessageId.partition);
        if (manager != null) {
            manager.fail(Long.valueOf(kafkaMessageId.offset));
        }
    }

    public void deactivate() {
        commit();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this._spoutConfig.scheme.getOutputFields());
    }

    private void commit() {
        this._lastUpdateMs = System.currentTimeMillis();
        Iterator<PartitionManager> it = this._coordinator.getMyManagedPartitions().iterator();
        while (it.hasNext()) {
            it.next().commit();
        }
    }

    public static void main(String[] strArr) {
    }
}
