package yieldbot.storm.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;

/* loaded from: input_file:yieldbot/storm/spout/RedisPubSubSpout.class */
public class RedisPubSubSpout extends BaseRichSpout {
    static final long serialVersionUID = 737015318988609460L;
    static Logger LOG = Logger.getLogger(RedisPubSubSpout.class);
    SpoutOutputCollector _collector;
    final String host;
    final int port;
    final String pattern;
    LinkedBlockingQueue<String> queue;
    JedisPool pool;

    /* loaded from: input_file:yieldbot/storm/spout/RedisPubSubSpout$ListenerThread.class */
    class ListenerThread extends Thread {
        LinkedBlockingQueue<String> queue;
        JedisPool pool;
        String pattern;

        public ListenerThread(LinkedBlockingQueue<String> linkedBlockingQueue, JedisPool jedisPool, String str) {
            this.queue = linkedBlockingQueue;
            this.pool = jedisPool;
            this.pattern = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            JedisPubSub jedisPubSub = new JedisPubSub() { // from class: yieldbot.storm.spout.RedisPubSubSpout.ListenerThread.1
                public void onMessage(String str, String str2) {
                    ListenerThread.this.queue.offer(str2);
                }

                public void onPMessage(String str, String str2, String str3) {
                    ListenerThread.this.queue.offer(str3);
                }

                public void onPSubscribe(String str, int i) {
                }

                public void onPUnsubscribe(String str, int i) {
                }

                public void onSubscribe(String str, int i) {
                }

                public void onUnsubscribe(String str, int i) {
                }
            };
            Jedis jedis = (Jedis) this.pool.getResource();
            try {
                jedis.psubscribe(jedisPubSub, new String[]{this.pattern});
                this.pool.returnResource(jedis);
            } catch (Throwable th) {
                this.pool.returnResource(jedis);
                throw th;
            }
        }
    }

    public RedisPubSubSpout(String str, int i, String str2) {
        this.host = str;
        this.port = i;
        this.pattern = str2;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        this.queue = new LinkedBlockingQueue<>(1000);
        this.pool = new JedisPool(new JedisPoolConfig(), this.host, this.port);
        new ListenerThread(this.queue, this.pool, this.pattern).start();
    }

    public void close() {
        this.pool.destroy();
    }

    public void nextTuple() {
        String poll = this.queue.poll();
        if (poll == null) {
            Utils.sleep(50L);
        } else {
            this._collector.emit(Utils.tuple(new Object[]{poll}));
        }
    }

    public void ack(Object obj) {
    }

    public void fail(Object obj) {
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{"message"}));
    }

    public boolean isDistributed() {
        return false;
    }
}
