package storm.kafka;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.transactional.state.TransactionalState;
import backtype.storm.utils.Utils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.log4j.Logger;

/* loaded from: input_file:storm/kafka/KafkaSpout.class */
public class KafkaSpout extends BaseRichSpout {
    public static final Logger LOG = Logger.getLogger(KafkaSpout.class);
    SpoutConfig _spoutConfig;
    SpoutOutputCollector _collector;
    TransactionalState _state;
    KafkaPartitionConnections _partitions;
    String _uuid = UUID.randomUUID().toString();
    Map<Integer, PartitionManager> _managers = new HashMap();
    long _lastUpdateMs = 0;
    int _currPartitionIndex = 0;
    List<Integer> _managedPartitions = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:storm/kafka/KafkaSpout$EmitState.class */
    public enum EmitState {
        EMITTED_MORE_LEFT,
        EMITTED_END,
        NO_EMITTED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:storm/kafka/KafkaSpout$KafkaMessageId.class */
    public static class KafkaMessageId {
        public int partition;
        public long offset;

        public KafkaMessageId(int i, long j) {
            this.partition = i;
            this.offset = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:storm/kafka/KafkaSpout$PartitionManager.class */
    public class PartitionManager {
        Long _emittedToOffset;
        Long _committedTo;
        int _partition;
        SortedSet<Long> _pending = new TreeSet();
        LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<>();

        public PartitionManager(int i) {
            this._partition = i;
            KafkaSpout.LOG.info("Creating partition manager for " + i + ". Checking metadata in ZK");
            ZooMeta zooMeta = (ZooMeta) KafkaSpout.this._state.getData(committedPath());
            KafkaSpout.LOG.info("Creating consumer for partition " + this._partition);
            SimpleConsumer consumer = KafkaSpout.this._partitions.getConsumer(this._partition);
            KafkaSpout.LOG.info("Created consumer at " + consumer.host());
            int hostPartition = KafkaSpout.this._partitions.getHostPartition(this._partition);
            if (zooMeta == null || (!KafkaSpout.this._uuid.equals(zooMeta.id) && KafkaSpout.this._spoutConfig.forceFromStart)) {
                this._committedTo = Long.valueOf(consumer.getOffsetsBefore(KafkaSpout.this._spoutConfig.topic, hostPartition, KafkaSpout.this._spoutConfig.startOffsetTime, 1)[0]);
            } else {
                this._committedTo = Long.valueOf(zooMeta.offset);
            }
            KafkaSpout.LOG.info("Starting Kafka " + consumer.host() + ":" + hostPartition + " from offset " + this._committedTo);
            this._emittedToOffset = this._committedTo;
        }

        public EmitState next() {
            KafkaSpout.LOG.info("Filling from " + KafkaSpout.this._partitions.getConsumer(this._partition).host());
            if (this._waitingToEmit.isEmpty()) {
                fill();
            }
            KafkaSpout.LOG.info("Filled from " + KafkaSpout.this._partitions.getConsumer(this._partition).host());
            MessageAndOffset pollFirst = this._waitingToEmit.pollFirst();
            if (pollFirst == null) {
                KafkaSpout.LOG.info("Nothing to emit from " + KafkaSpout.this._partitions.getConsumer(this._partition).host());
                return EmitState.NO_EMITTED;
            }
            KafkaSpout.this._collector.emit(KafkaSpout.this._spoutConfig.scheme.deserialize(Utils.toByteArray(pollFirst.message().payload())), new KafkaMessageId(this._partition, actualOffset(pollFirst)));
            return this._waitingToEmit.size() > 0 ? EmitState.EMITTED_MORE_LEFT : EmitState.EMITTED_END;
        }

        private void fill() {
            Iterator it = KafkaSpout.this._partitions.getConsumer(this._partition).fetch(new FetchRequest(KafkaSpout.this._spoutConfig.topic, KafkaSpout.this._partitions.getHostPartition(this._partition), this._emittedToOffset.longValue(), KafkaSpout.this._spoutConfig.fetchSizeBytes)).iterator();
            while (it.hasNext()) {
                MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                this._pending.add(Long.valueOf(actualOffset(messageAndOffset)));
                this._waitingToEmit.add(messageAndOffset);
                this._emittedToOffset = Long.valueOf(messageAndOffset.offset());
            }
        }

        public void ack(Long l) {
            this._pending.remove(l);
        }

        public void fail(Long l) {
            if (this._emittedToOffset.longValue() > l.longValue()) {
                this._emittedToOffset = l;
                this._pending.tailSet(l).clear();
            }
        }

        public void commit() {
            long longValue = this._pending.isEmpty() ? this._emittedToOffset.longValue() : this._pending.first().longValue();
            if (longValue != this._committedTo.longValue()) {
                KafkaSpout.this._state.setData(committedPath(), new ZooMeta(KafkaSpout.this._uuid, longValue));
                this._committedTo = Long.valueOf(longValue);
            }
        }

        private long actualOffset(MessageAndOffset messageAndOffset) {
            return messageAndOffset.offset() - messageAndOffset.message().serializedSize();
        }

        private String committedPath() {
            return KafkaSpout.this._spoutConfig.id + "/" + this._partition;
        }
    }

    /* loaded from: input_file:storm/kafka/KafkaSpout$ZooMeta.class */
    public static class ZooMeta implements Serializable {
        String id;
        long offset;

        public ZooMeta() {
        }

        public ZooMeta(String str, long j) {
            this.id = str;
            this.offset = j;
        }
    }

    public KafkaSpout(SpoutConfig spoutConfig) {
        this._spoutConfig = spoutConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        LOG.info("Storm kafka spout opening");
        this._collector = spoutOutputCollector;
        HashMap hashMap = new HashMap(map);
        List<String> list = this._spoutConfig.zkServers;
        if (list == null) {
            list = (List) map.get(Config.STORM_ZOOKEEPER_SERVERS);
        }
        Integer num = this._spoutConfig.zkPort;
        if (num == null) {
            num = Integer.valueOf(((Number) map.get(Config.STORM_ZOOKEEPER_PORT)).intValue());
        }
        String str = this._spoutConfig.zkRoot;
        hashMap.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, list);
        hashMap.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, num);
        hashMap.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, str);
        Config config = new Config();
        config.registerSerialization(ZooMeta.class);
        LOG.info("Creating user state in zookeeper");
        this._state = TransactionalState.newUserState(hashMap, this._spoutConfig.id, config);
        LOG.info("Created user state in zookeeper");
        this._partitions = new KafkaPartitionConnections(this._spoutConfig);
        int size = this._spoutConfig.partitionsPerHost * this._spoutConfig.hosts.size();
        int size2 = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        LOG.info("Setting up partitions: " + size + " " + size2 + " " + topologyContext.getThisTaskIndex());
        int thisTaskIndex = topologyContext.getThisTaskIndex();
        while (true) {
            int i = thisTaskIndex;
            if (i >= size) {
                return;
            }
            this._managedPartitions.add(Integer.valueOf(i));
            this._managers.put(Integer.valueOf(i), new PartitionManager(i));
            thisTaskIndex = i + size2;
        }
    }

    public void nextTuple() {
        for (int i = 0; i < this._managedPartitions.size(); i++) {
            EmitState next = this._managers.get(Integer.valueOf(this._managedPartitions.get(this._currPartitionIndex).intValue())).next();
            if (next != EmitState.EMITTED_MORE_LEFT) {
                this._currPartitionIndex = (this._currPartitionIndex + 1) % this._managedPartitions.size();
            }
            if (next != EmitState.NO_EMITTED) {
                break;
            }
        }
        if (System.currentTimeMillis() - this._lastUpdateMs > this._spoutConfig.stateUpdateIntervalMs) {
            commit();
        }
    }

    public void ack(Object obj) {
        KafkaMessageId kafkaMessageId = (KafkaMessageId) obj;
        this._managers.get(Integer.valueOf(kafkaMessageId.partition)).ack(Long.valueOf(kafkaMessageId.offset));
    }

    public void fail(Object obj) {
        KafkaMessageId kafkaMessageId = (KafkaMessageId) obj;
        this._managers.get(Integer.valueOf(kafkaMessageId.partition)).fail(Long.valueOf(kafkaMessageId.offset));
    }

    public void deactivate() {
        commit();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this._spoutConfig.scheme.getOutputFields());
    }

    private void commit() {
        this._lastUpdateMs = System.currentTimeMillis();
        Iterator<PartitionManager> it = this._managers.values().iterator();
        while (it.hasNext()) {
            it.next().commit();
        }
    }

    public static void main(String[] strArr) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        ArrayList arrayList = new ArrayList();
        arrayList.add("localhost");
        SpoutConfig fromHostStrings = SpoutConfig.fromHostStrings(arrayList, 8, "clicks", "/kafkastorm", "id");
        fromHostStrings.scheme = new StringScheme();
        fromHostStrings.forceStartOffsetTime(-2L);
        topologyBuilder.setSpout("spout", new KafkaSpout(fromHostStrings), 3);
        new LocalCluster().submitTopology("kafka-test", new Config(), topologyBuilder.createTopology());
        Utils.sleep(600000L);
    }
}
