package com.hbase.haxwell;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.hbase.haxwell.api.HaxwellEvent;
import com.hbase.haxwell.api.HaxwellEventListener;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/hbase/haxwell/HaxwellEventExecutor.class */
public class HaxwellEventExecutor {
    private HaxwellEventListener eventListener;
    private int numThreads;
    private int batchSize;
    private HaxwellMetrics HaxwellMetrics;
    private List<ThreadPoolExecutor> executors;
    private Multimap<Integer, HaxwellEvent> eventBuffers;
    private Log log = LogFactory.getLog(getClass());
    private HashFunction hashFunction = Hashing.murmur3_32();
    private boolean stopped = false;
    private List<Future<?>> futures = Lists.newArrayList();

    public HaxwellEventExecutor(HaxwellEventListener haxwellEventListener, List<ThreadPoolExecutor> list, int i, HaxwellMetrics haxwellMetrics) {
        this.eventListener = haxwellEventListener;
        this.executors = list;
        this.numThreads = list.size();
        this.batchSize = i;
        this.HaxwellMetrics = haxwellMetrics;
        this.eventBuffers = ArrayListMultimap.create(this.numThreads, i);
    }

    public void scheduleHaxwellEvent(HaxwellEvent haxwellEvent) {
        if (this.stopped) {
            throw new IllegalStateException("This executor is stopped");
        }
        int asInt = (this.hashFunction.hashBytes(haxwellEvent.getRow()).asInt() & Integer.MAX_VALUE) % this.numThreads;
        List list = (List) this.eventBuffers.get(Integer.valueOf(asInt));
        list.add(haxwellEvent);
        if (list.size() == this.batchSize) {
            scheduleEventBatch(asInt, Lists.newArrayList(list));
            this.eventBuffers.removeAll(Integer.valueOf(asInt));
        }
    }

    private void scheduleEventBatch(int i, final List<HaxwellEvent> list) {
        this.futures.add(this.executors.get(i).submit(new Runnable() { // from class: com.hbase.haxwell.HaxwellEventExecutor.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    HaxwellEventExecutor.this.log.debug("Delivering message to listener");
                    HaxwellEventExecutor.this.eventListener.processEvents(list);
                    HaxwellEventExecutor.this.HaxwellMetrics.reportFilteredSepOperation(System.currentTimeMillis() - currentTimeMillis);
                } catch (RuntimeException e) {
                    HaxwellEventExecutor.this.log.error("Error while processing event", e);
                    throw e;
                }
            }
        }));
    }

    public List<Future<?>> flush() {
        Iterator it = this.eventBuffers.keySet().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            List list = (List) this.eventBuffers.get(Integer.valueOf(intValue));
            if (!list.isEmpty()) {
                scheduleEventBatch(intValue, Lists.newArrayList(list));
            }
        }
        this.eventBuffers.clear();
        return Lists.newArrayList(this.futures);
    }
}
