package storm.kafka;

import backtype.storm.task.IMetricsContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.GlobalPartitionInformation;

/* loaded from: input_file:storm/kafka/ZkCoordinator.class */
public class ZkCoordinator implements PartitionCoordinator {
    public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
    SpoutConfig _spoutConfig;
    int _taskIndex;
    int _totalTasks;
    String _topologyInstanceId;
    List<PartitionManager> _cachedList;
    int _refreshFreqMs;
    DynamicPartitionConnections _connections;
    DynamicBrokersReader _reader;
    ZkState _state;
    Map _stormConf;
    IMetricsContext _metricsContext;
    Map<Partition, PartitionManager> _managers = new HashMap();
    Long _lastRefreshTime = null;

    public ZkCoordinator(DynamicPartitionConnections dynamicPartitionConnections, Map map, SpoutConfig spoutConfig, ZkState zkState, int i, int i2, String str) {
        this._spoutConfig = spoutConfig;
        this._connections = dynamicPartitionConnections;
        this._taskIndex = i;
        this._totalTasks = i2;
        this._topologyInstanceId = str;
        this._stormConf = map;
        this._state = zkState;
        ZkHosts zkHosts = (ZkHosts) spoutConfig.hosts;
        this._refreshFreqMs = zkHosts.refreshFreqSecs * 1000;
        this._reader = new DynamicBrokersReader(map, zkHosts.brokerZkStr, zkHosts.brokerZkPath, spoutConfig.topic);
    }

    @Override // storm.kafka.PartitionCoordinator
    public List<PartitionManager> getMyManagedPartitions() {
        if (this._lastRefreshTime == null || System.currentTimeMillis() - this._lastRefreshTime.longValue() > this._refreshFreqMs) {
            refresh();
            this._lastRefreshTime = Long.valueOf(System.currentTimeMillis());
        }
        return this._cachedList;
    }

    void refresh() {
        try {
            LOG.info("Refreshing partition manager connections");
            GlobalPartitionInformation brokerInfo = this._reader.getBrokerInfo();
            HashSet hashSet = new HashSet();
            Iterator<Partition> it = brokerInfo.iterator();
            while (it.hasNext()) {
                Partition next = it.next();
                if (myOwnership(next)) {
                    hashSet.add(next);
                }
            }
            Set<Partition> keySet = this._managers.keySet();
            HashSet<Partition> hashSet2 = new HashSet(hashSet);
            hashSet2.removeAll(keySet);
            HashSet hashSet3 = new HashSet(keySet);
            hashSet3.removeAll(hashSet);
            LOG.info("Deleted partition managers: " + hashSet3.toString());
            Iterator it2 = hashSet3.iterator();
            while (it2.hasNext()) {
                this._managers.remove((Partition) it2.next()).close();
            }
            LOG.info("New partition managers: " + hashSet2.toString());
            for (Partition partition : hashSet2) {
                this._managers.put(partition, new PartitionManager(this._connections, this._topologyInstanceId, this._state, this._stormConf, this._spoutConfig, partition));
            }
            this._cachedList = new ArrayList(this._managers.values());
            LOG.info("Finished refreshing");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // storm.kafka.PartitionCoordinator
    public PartitionManager getManager(Partition partition) {
        return this._managers.get(partition);
    }

    private boolean myOwnership(Partition partition) {
        return Math.abs(partition.host.hashCode() + (23 * partition.partition)) % this._totalTasks == this._taskIndex;
    }
}
