package nginx.clojure.util;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import nginx.clojure.AppEventListenerManager;
import nginx.clojure.MiniConstants;
import nginx.clojure.NginxClojureRT;
import nginx.clojure.logger.LoggerService;

/* loaded from: input_file:nginx/clojure/util/NginxPubSubTopic.class */
public class NginxPubSubTopic {
    protected String topic;
    protected long topicId;
    public static final String PUBSUB_SHARED_MAP_NAME = "PubSubTopic";
    public static final String PUBSUB_TOPIC_ID_COUNTER = "PUBSUB_TOPIC_ID_COUNTER";
    public static final long PUBSUB_EVENT_ID_COUNTER = 0;
    private static final long PUBSUB_MAX_TOPIC_ID = 9007199254740991L;
    protected static NginxSharedHashMap<Object, Object> sharedBox;
    protected static ConcurrentHashMap<Long, Set<PubSubListenerData>> topicSubs = new ConcurrentHashMap<>();
    static final LoggerService logger = NginxClojureRT.getLog();

    /* loaded from: input_file:nginx/clojure/util/NginxPubSubTopic$PubSubListenerData.class */
    public static class PubSubListenerData<T> {
        public NginxPubSubListener<T> listener;
        public T data;

        public PubSubListenerData() {
        }

        public PubSubListenerData(NginxPubSubListener<T> nginxPubSubListener, T t) {
            this.listener = nginxPubSubListener;
            this.data = t;
        }
    }

    private static void initSharedBox() {
        sharedBox = NginxSharedHashMap.build(PUBSUB_SHARED_MAP_NAME);
        if (sharedBox == null) {
            NginxClojureRT.getLog().error("can not find shared map 'PubSubTopic' without which NginxPubSubTopic can't work!");
        } else {
            sharedBox.putIntIfAbsent(0L, 1);
            NginxClojureRT.getAppEventListenerManager().addListener(new AppEventListenerManager.Listener() { // from class: nginx.clojure.util.NginxPubSubTopic.1
                @Override // nginx.clojure.AppEventListenerManager.Listener
                public void onEvent(AppEventListenerManager.PostedEvent postedEvent) throws IOException {
                    if (postedEvent.tag == 30) {
                        long longValue = ((Long) postedEvent.data).longValue();
                        long j = longValue | 4294967296L;
                        String str = (String) NginxPubSubTopic.sharedBox.get(Long.valueOf(longValue));
                        long j2 = NginxPubSubTopic.sharedBox.getLong(Long.valueOf(j)) >> 10;
                        long atomicAddLong = NginxPubSubTopic.sharedBox.atomicAddLong(Long.valueOf(j), -1L);
                        if (NginxPubSubTopic.logger.isDebugEnabled()) {
                            NginxPubSubTopic.logger.debug("handle pub post event, id=%x, rid=%x, message=%s, topicId=%x, counter=%x,%x", Long.valueOf(longValue), Long.valueOf(j), str, Long.valueOf(j2), Long.valueOf(atomicAddLong), Long.valueOf(atomicAddLong & 1023));
                        }
                        if ((atomicAddLong & 1023) == 1) {
                            NginxPubSubTopic.sharedBox.delete(Long.valueOf(j));
                            NginxPubSubTopic.sharedBox.delete(Long.valueOf(longValue));
                        }
                        Set<PubSubListenerData> set = NginxPubSubTopic.topicSubs.get(Long.valueOf(j2));
                        if (set == null) {
                            NginxClojureRT.getLog().debug("no sub found about topic %d", Long.valueOf(j2));
                            return;
                        }
                        for (PubSubListenerData pubSubListenerData : set) {
                            try {
                                pubSubListenerData.listener.onMessage(str, pubSubListenerData.data);
                            } catch (Throwable th) {
                                NginxClojureRT.getLog().warn("error on pubsub event, message=%s", th);
                            }
                        }
                    }
                }
            });
        }
    }

    public NginxPubSubTopic(String str) {
        if (sharedBox == null) {
            synchronized (topicSubs) {
                initSharedBox();
            }
        }
        this.topic = str;
        if (sharedBox == null) {
            throw new RuntimeException("can not find shared map 'PubSubTopic' without which NginxPubSubTopic can't work!");
        }
        Object obj = sharedBox.get(str);
        if (obj != null) {
            this.topicId = ((Long) obj).longValue();
            return;
        }
        if (sharedBox.putIfAbsent(PUBSUB_TOPIC_ID_COUNTER, 2L) == null) {
            this.topicId = 1L;
        } else {
            this.topicId = sharedBox.atomicAddLong(PUBSUB_TOPIC_ID_COUNTER, 1L);
            if (this.topicId > PUBSUB_MAX_TOPIC_ID) {
                throw new RuntimeException("too many topics! nginx-clojure can not support > 9007199254740991 topics");
            }
        }
        Object putIfAbsent = sharedBox.putIfAbsent(str, Long.valueOf(this.topicId));
        if (putIfAbsent != null) {
            this.topicId = ((Long) putIfAbsent).longValue();
        }
    }

    public void publish(String str) {
        long atomicAddInt = sharedBox.atomicAddInt(0L, 1) & 4294967295L;
        sharedBox.put(Long.valueOf(atomicAddInt), str);
        sharedBox.putLong(Long.valueOf(atomicAddInt | 4294967296L), MiniConstants.NGX_WORKER_PROCESSORS_NUM | (this.topicId << 10));
        if (logger.isDebugEnabled()) {
            long j = MiniConstants.NGX_WORKER_PROCESSORS_NUM | (this.topicId << 10);
            logger.debug("pub id=%x, rid=%x, counter=%x, %x", Long.valueOf(atomicAddInt), Long.valueOf(atomicAddInt | 4294967296L), Long.valueOf(j), Long.valueOf(j & 1023));
        }
        NginxClojureRT.broadcastEvent(30L, atomicAddInt);
    }

    public <T> PubSubListenerData<T> subscribe(T t, NginxPubSubListener<T> nginxPubSubListener) {
        Set<PubSubListenerData> set = topicSubs.get(Long.valueOf(this.topicId));
        if (set == null) {
            ConcurrentHashMap<Long, Set<PubSubListenerData>> concurrentHashMap = topicSubs;
            Long valueOf = Long.valueOf(this.topicId);
            Set<PubSubListenerData> newSetFromMap = Collections.newSetFromMap(new ConcurrentHashMap());
            set = newSetFromMap;
            Set<PubSubListenerData> putIfAbsent = concurrentHashMap.putIfAbsent(valueOf, newSetFromMap);
            if (putIfAbsent != null) {
                set = putIfAbsent;
            }
        }
        PubSubListenerData<T> pubSubListenerData = new PubSubListenerData<>(nginxPubSubListener, t);
        set.add(pubSubListenerData);
        return pubSubListenerData;
    }

    public void unsubscribe(PubSubListenerData pubSubListenerData) {
        Set<PubSubListenerData> set = topicSubs.get(Long.valueOf(this.topicId));
        if (set != null) {
            set.remove(pubSubListenerData);
        }
    }

    public void destory() {
        sharedBox.delete(this.topic);
        topicSubs.remove(Long.valueOf(this.topicId));
    }
}
