package storm.kafka;

import backtype.storm.Config;
import backtype.storm.utils.Utils;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import storm.kafka.KafkaConfig;

/* loaded from: input_file:storm/kafka/ZkCoordinator.class */
public class ZkCoordinator implements PartitionCoordinator {
    public static Logger LOG = Logger.getLogger(ZkCoordinator.class);
    SpoutConfig _spoutConfig;
    int _taskIndex;
    int _totalTasks;
    String _topologyInstanceId;
    CuratorFramework _curator;
    List<PartitionManager> _cachedList;
    int _refreshFreqMs;
    KafkaConfig.ZkHosts _brokerConf;
    DynamicPartitionConnections _connections;
    ZkState _state;
    Map _stormConf;
    Map<GlobalPartitionId, PartitionManager> _managers = new HashMap();
    Long _lastRefreshTime = null;

    public ZkCoordinator(DynamicPartitionConnections dynamicPartitionConnections, Map map, SpoutConfig spoutConfig, ZkState zkState, int i, int i2, String str) {
        this._spoutConfig = spoutConfig;
        this._connections = dynamicPartitionConnections;
        this._taskIndex = i;
        this._totalTasks = i2;
        this._topologyInstanceId = str;
        this._stormConf = map;
        this._state = zkState;
        this._brokerConf = (KafkaConfig.ZkHosts) this._spoutConfig.hosts;
        this._refreshFreqMs = this._brokerConf.refreshFreqSecs * 1000;
        try {
            this._curator = CuratorFrameworkFactory.newClient(this._brokerConf.brokerZkStr, Utils.getInt(map.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)).intValue(), 15000, new RetryNTimes(Utils.getInt(map.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)).intValue(), Utils.getInt(map.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)).intValue()));
            this._curator.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // storm.kafka.PartitionCoordinator
    public List<PartitionManager> getMyManagedPartitions() {
        if (this._lastRefreshTime == null || System.currentTimeMillis() - this._lastRefreshTime.longValue() > this._refreshFreqMs) {
            refresh();
            this._lastRefreshTime = Long.valueOf(System.currentTimeMillis());
        }
        return this._cachedList;
    }

    void refresh() {
        try {
            LOG.info("Refreshing partition manager connections");
            String str = this._brokerConf.brokerZkPath + "/topics/" + this._spoutConfig.topic;
            String str2 = this._brokerConf.brokerZkPath + "/ids";
            List<String> list = (List) this._curator.getChildren().forPath(str);
            HashSet hashSet = new HashSet();
            for (String str3 : list) {
                try {
                    byte[] bArr = (byte[]) this._curator.getData().forPath(str + "/" + str3);
                    HostPort brokerHost = getBrokerHost((byte[]) this._curator.getData().forPath(str2 + "/" + str3));
                    int numPartitions = getNumPartitions(bArr);
                    for (int i = 0; i < numPartitions; i++) {
                        GlobalPartitionId globalPartitionId = new GlobalPartitionId(brokerHost, i);
                        if (myOwnership(globalPartitionId)) {
                            hashSet.add(globalPartitionId);
                        }
                    }
                } catch (KeeperException.NoNodeException e) {
                }
            }
            Set<GlobalPartitionId> keySet = this._managers.keySet();
            HashSet<GlobalPartitionId> hashSet2 = new HashSet(hashSet);
            hashSet2.removeAll(keySet);
            HashSet hashSet3 = new HashSet(keySet);
            hashSet3.removeAll(hashSet);
            LOG.info("Deleted partition managers: " + hashSet3.toString());
            Iterator it = hashSet3.iterator();
            while (it.hasNext()) {
                this._managers.remove((GlobalPartitionId) it.next()).close();
            }
            LOG.info("New partition managers: " + hashSet2.toString());
            for (GlobalPartitionId globalPartitionId2 : hashSet2) {
                this._managers.put(globalPartitionId2, new PartitionManager(this._connections, this._topologyInstanceId, this._state, this._stormConf, this._spoutConfig, globalPartitionId2));
            }
            this._cachedList = new ArrayList(this._managers.values());
            LOG.info("Finished refreshing");
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // storm.kafka.PartitionCoordinator
    public PartitionManager getManager(GlobalPartitionId globalPartitionId) {
        return this._managers.get(globalPartitionId);
    }

    private boolean myOwnership(GlobalPartitionId globalPartitionId) {
        return Math.abs(globalPartitionId.host.hashCode() + (23 * globalPartitionId.partition)) % this._totalTasks == this._taskIndex;
    }

    private static HostPort getBrokerHost(byte[] bArr) {
        try {
            String[] split = new String(bArr, "UTF-8").split(":");
            return new HostPort(split[split.length - 2], Integer.parseInt(split[split.length - 1]));
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }

    private static int getNumPartitions(byte[] bArr) {
        try {
            return Integer.parseInt(new String(bArr, "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }
}
