package com.hbase.haxwell;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.JsonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hbase/haxwell/HaxwellRegionObserver.class */
public class HaxwellRegionObserver extends BaseRegionObserver {
    private static final Logger log = LoggerFactory.getLogger(HaxwellRegionObserver.class);
    private KafkaMessageProducer messageProducer;
    private String topic;
    private boolean enableDelete;

    public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        String str = coprocessorEnvironment.getConfiguration().get("haxwell.kafka.bootstrap.servers");
        String str2 = coprocessorEnvironment.getConfiguration().get("haxwell.kafka.acks", "all");
        int i = coprocessorEnvironment.getConfiguration().getInt("haxwell.kafka.retries", 3);
        int i2 = coprocessorEnvironment.getConfiguration().getInt("haxwell.kafka.batch.size", 1);
        int i3 = coprocessorEnvironment.getConfiguration().getInt("haxwell.kafka.autocommit.interval.ms", 30000);
        this.topic = coprocessorEnvironment.getConfiguration().get("haxwell.kafka.topic");
        this.enableDelete = coprocessorEnvironment.getConfiguration().getBoolean("haxwell.delete.enable", false);
        this.messageProducer = new KafkaMessageProducer(str, str2, i, i2, i3);
    }

    public void postPut(ObserverContext<RegionCoprocessorEnvironment> observerContext, Put put, WALEdit wALEdit, Durability durability) throws IOException {
        sendData(observerContext, put.getId(), put.toMap(Integer.MAX_VALUE), "PUT");
    }

    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> observerContext, Delete delete, WALEdit wALEdit, Durability durability) throws IOException {
        if (this.enableDelete) {
            sendData(observerContext, delete.getId(), delete.toMap(Integer.MAX_VALUE), "DELETE");
        }
    }

    private void sendData(ObserverContext<RegionCoprocessorEnvironment> observerContext, String str, Map<String, Object> map, String str2) throws IOException {
        String nameAsString = observerContext.getEnvironment().getRegion().getTableDesc().getNameAsString();
        if (log.isDebugEnabled()) {
            log.debug("Table Name: {} | Data: {}", nameAsString, JsonMapper.writeMapAsString(map));
        }
        try {
            this.messageProducer.send(this.topic, str, JsonMapper.writeMapAsString(ImmutableMap.builder().put("table", nameAsString).put("operation", str2).put("id", str).put("data", map).build()));
        } catch (Exception e) {
            log.error("Error sending event to Kafka", e);
            throw new IOException(e);
        }
    }
}
