package org.trpr.platform.batch.impl.spring.reader;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.trpr.platform.batch.impl.spring.partitioner.SimpleRangePartitioner;
import org.trpr.platform.batch.spi.spring.reader.BatchItemStreamReader;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

/* loaded from: input_file:org/trpr/platform/batch/impl/spring/reader/CompositeItemStreamReader.class */
public class CompositeItemStreamReader<T> implements BatchItemStreamReader<T>, InitializingBean {
    private static final int DEFAULT_BATCH_READ_TIMEOUT = 60;
    private static final Logger LOGGER = LogFactory.getLogger(CompositeItemStreamReader.class);
    private BatchItemStreamReader<T> delegate;
    private CountDownLatch countDownLatch;
    private Queue<T> localQueue = new ConcurrentLinkedQueue();
    private Queue<ExecutionContext> contextList = new ConcurrentLinkedQueue();
    private int batchReadTimeout = DEFAULT_BATCH_READ_TIMEOUT;

    public CompositeItemStreamReader(BatchItemStreamReader<T> batchItemStreamReader) {
        this.delegate = batchItemStreamReader;
    }

    public T read() throws Exception, UnexpectedInputException, ParseException {
        ExecutionContext poll;
        synchronized (this) {
            if (!this.localQueue.isEmpty()) {
                LOGGER.debug("Returning data from local cache. Cache size : " + this.localQueue.size());
                return this.localQueue.poll();
            }
            synchronized (this) {
                poll = this.contextList.isEmpty() ? null : this.contextList.poll();
            }
            if (poll != null) {
                LOGGER.debug("Invoking batch read on partition");
                try {
                    try {
                        T[] batchRead = this.delegate.batchRead(poll);
                        synchronized (this) {
                            for (T t : batchRead) {
                                if (t != null) {
                                    this.localQueue.add(t);
                                }
                            }
                            this.countDownLatch.countDown();
                            if (!this.localQueue.isEmpty()) {
                                LOGGER.debug("Returning data from local cache after partition read. Cache size : " + this.localQueue.size());
                                return this.localQueue.poll();
                            }
                        }
                    } catch (Exception e) {
                        LOGGER.warn("Batch read failed for partition. Error is : {}", e.getMessage(), e);
                        this.countDownLatch.countDown();
                        if (!this.localQueue.isEmpty()) {
                            LOGGER.debug("Returning data from local cache after partition read. Cache size : " + this.localQueue.size());
                            return this.localQueue.poll();
                        }
                    }
                } catch (Throwable th) {
                    this.countDownLatch.countDown();
                    if (this.localQueue.isEmpty()) {
                        throw th;
                    }
                    LOGGER.debug("Returning data from local cache after partition read. Cache size : " + this.localQueue.size());
                    return this.localQueue.poll();
                }
            }
            this.countDownLatch.await(getBatchReadTimeout(), TimeUnit.SECONDS);
            if (this.countDownLatch.getCount() > 0) {
                LOGGER.info("Count down latch timeout occurred before completion. Counting down to zero and clearing the context list!");
                this.contextList.clear();
                while (this.countDownLatch.getCount() > 0) {
                    this.countDownLatch.countDown();
                }
            }
            synchronized (this) {
                if (this.localQueue.isEmpty()) {
                    LOGGER.debug("No more data to read. Returning null");
                    return null;
                }
                LOGGER.debug("Returning data from local cache after re-check. Cache size : " + this.localQueue.size());
                return this.localQueue.poll();
            }
        }
    }

    @Override // org.trpr.platform.batch.spi.spring.reader.BatchItemStreamReader
    public T[] batchRead(ExecutionContext executionContext) throws Exception, UnexpectedInputException, ParseException {
        throw new UnsupportedOperationException("Illegal invocation of batchRead(), call read() instead.");
    }

    public void close() throws ItemStreamException {
        this.delegate.close();
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        LOGGER.debug("Called open for a partition");
        this.contextList.add(executionContext);
        if (this.countDownLatch == null || this.countDownLatch.getCount() == 0) {
            this.countDownLatch = new CountDownLatch(executionContext.getInt(SimpleRangePartitioner.TOTAL_PARTITIIONS, 1));
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        this.delegate.update(executionContext);
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.delegate, "The 'delegate' may not be null");
    }

    public BatchItemStreamReader<T> getDelegate() {
        return this.delegate;
    }

    public int getBatchReadTimeout() {
        return this.batchReadTimeout;
    }

    public void setBatchReadTimeout(int i) {
        this.batchReadTimeout = i;
    }
}
