package crux.kafka.connect;

import clojure.java.api.Clojure;
import clojure.lang.IFn;
import crux.api.Crux;
import crux.api.ICruxAPI;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

/* loaded from: input_file:crux/kafka/connect/CruxSourceTask.class */
public class CruxSourceTask extends SourceTask {
    private Map<String, String> props;
    private ICruxAPI api;
    private Map<String, ?> sourceOffset;
    private static IFn pollSourceRecords;

    public String version() {
        return new CruxSourceConnector().version();
    }

    public void start(Map<String, String> map) {
        this.props = map;
        this.api = Crux.newApiClient(map.get("url"));
    }

    public List<SourceRecord> poll() throws InterruptedException {
        if (this.sourceOffset == null) {
            this.sourceOffset = this.context.offsetStorageReader().offset(Collections.singletonMap("url", this.props.get("url")));
        }
        List<SourceRecord> list = (List) pollSourceRecords.invoke(this.api, this.sourceOffset, this.props);
        if (!list.isEmpty()) {
            this.sourceOffset = list.get(list.size() - 1).sourceOffset();
        }
        return list;
    }

    public void stop() {
        if (this.api != null) {
            try {
                this.api.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static {
        Clojure.var("clojure.core/require").invoke(Clojure.read("crux.kafka.connect"));
        pollSourceRecords = Clojure.var("crux.kafka.connect/poll-source-records");
    }
}
