package kd.fi.v2.fah.utils.pipe;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import kd.fi.bd.indexing.constant.CDCStageEnum;
import kd.fi.bd.util.exception.IExceptionListener;
import kd.fi.v2.fah.log.ILogHandler;
import kd.fi.v2.fah.storage.IDataItemKey;

/* loaded from: input_file:kd/fi/v2/fah/utils/pipe/FahAsyncStreamPipe.class */
public class FahAsyncStreamPipe<E> implements IDataItemKey<Object> {
    public static final int PIPE_NOT_RUNNING = 0;
    public static final int PIPE_STARTING = 1;
    public static final int PIPE_RUNNING = 2;
    public static final int PIPE_STOPPING = 3;
    private long ConsumerOnEmptyQueue_SleepTime;
    private int EmptyQueue_ReCheckTimes;
    protected ILogHandler logger;
    protected Object pipeItemKey;
    protected ConcurrentLinkedQueue<IFahIAsyncStreamDataBlock<E>> dataQueue;
    protected Function<Object, BiConsumer<Integer, IFahIAsyncStreamDataBlock>> consumerSupplier;
    protected AtomicBoolean pipeLock;
    protected AtomicInteger pipeStatus;
    protected AtomicInteger activeConsumerCnt;
    protected AtomicInteger waitingDataBlockCnt;
    protected IExceptionListener exceptionListener;
    protected int maxConsumerCnt;
    protected int queuePreConsumerThreshold;
    protected List<Future> consumerThreadRefs;
    protected Function<FahAsyncStreamPipe<E>.SimplePipeConsumerThread, Future> threadPoolFunc;

    /* loaded from: input_file:kd/fi/v2/fah/utils/pipe/FahAsyncStreamPipe$AbstractPipeConsumerThread.class */
    public abstract class AbstractPipeConsumerThread<DATA> implements Callable<Integer> {
        protected final int consumerIndex;
        protected final BiConsumer<Integer, DATA> consumer;

        protected AbstractPipeConsumerThread(int i, BiConsumer<Integer, DATA> biConsumer) {
            this.consumerIndex = i;
            this.consumer = biConsumer;
        }

        protected abstract int processQueueData(ConcurrentLinkedQueue<? extends IFahIAsyncStreamDataBlock<E>> concurrentLinkedQueue);

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws Exception {
            int decrementAndGet;
            int i = FahAsyncStreamPipe.this.EmptyQueue_ReCheckTimes;
            while (true) {
                FahAsyncStreamPipe.this.notifyActiveConsumerCntChanged();
                while (FahAsyncStreamPipe.this.isRunning() && i > 0) {
                    if (processQueueData(FahAsyncStreamPipe.this.dataQueue) <= 0) {
                        i--;
                    }
                }
                decrementAndGet = FahAsyncStreamPipe.this.activeConsumerCnt.decrementAndGet();
                FahAsyncStreamPipe.this.notifyActiveConsumerCntChanged();
                if (!FahAsyncStreamPipe.this.isRunning() || (FahAsyncStreamPipe.this.isEmpty() && FahAsyncStreamPipe.this.isRunning())) {
                    break;
                }
                i = FahAsyncStreamPipe.this.EmptyQueue_ReCheckTimes;
                FahAsyncStreamPipe.this.activeConsumerCnt.incrementAndGet();
            }
            return Integer.valueOf(decrementAndGet);
        }
    }

    /* loaded from: input_file:kd/fi/v2/fah/utils/pipe/FahAsyncStreamPipe$BatchProcessPipeConsumerThread.class */
    public class BatchProcessPipeConsumerThread extends FahAsyncStreamPipe<E>.AbstractPipeConsumerThread<List<IFahIAsyncStreamDataBlock<E>>> {
        protected int batchSize;
        protected List<IFahIAsyncStreamDataBlock<E>> __currentBatchCache;
        protected int __currentBatchCnt;

        protected BatchProcessPipeConsumerThread(int i, BiConsumer<Integer, List<IFahIAsyncStreamDataBlock<E>>> biConsumer, int i2) {
            super(i, biConsumer);
            this.__currentBatchCnt = 0;
            this.batchSize = i2;
            this.__currentBatchCache = new ArrayList(i2);
        }

        @Override // kd.fi.v2.fah.utils.pipe.FahAsyncStreamPipe.AbstractPipeConsumerThread
        protected int processQueueData(ConcurrentLinkedQueue<? extends IFahIAsyncStreamDataBlock<E>> concurrentLinkedQueue) {
            IFahIAsyncStreamDataBlock<E> poll;
            int i = 0;
            this.__currentBatchCnt = 0;
            this.__currentBatchCache.clear();
            while (this.__currentBatchCnt < this.batchSize && (poll = concurrentLinkedQueue.poll()) != null) {
                try {
                    try {
                        FahAsyncStreamPipe.this.waitingDataBlockCnt.decrementAndGet();
                        if (poll.lock()) {
                            this.__currentBatchCache.add(poll);
                            this.__currentBatchCnt++;
                        }
                    } catch (Exception e) {
                        FahAsyncStreamPipe.this.logger.error(e.getMessage(), e);
                        if (FahAsyncStreamPipe.this.exceptionListener != null) {
                            FahAsyncStreamPipe.this.exceptionListener.onError(e);
                        }
                        Iterator<IFahIAsyncStreamDataBlock<E>> it = this.__currentBatchCache.iterator();
                        while (it.hasNext()) {
                            it.next().onStageCompleted(CDCStageEnum.Completed, false);
                        }
                    }
                } finally {
                    Iterator<IFahIAsyncStreamDataBlock<E>> it2 = this.__currentBatchCache.iterator();
                    while (it2.hasNext()) {
                        it2.next().onStageCompleted(CDCStageEnum.Completed, true);
                    }
                }
            }
            if (this.__currentBatchCache.isEmpty()) {
                FahAsyncStreamPipe.this.__waitDataQueueChange(FahAsyncStreamPipe.this.ConsumerOnEmptyQueue_SleepTime);
            } else {
                FahAsyncStreamPipe.this.notifyWaitingDataBlockCntCntChanged();
                this.consumer.accept(Integer.valueOf(this.consumerIndex), this.__currentBatchCache);
                this.__currentBatchCache.forEach(iFahIAsyncStreamDataBlock -> {
                    iFahIAsyncStreamDataBlock.onStageCompleted(CDCStageEnum.Completed, true);
                });
                i = 0 + this.__currentBatchCache.size();
            }
            return i;
        }
    }

    /* loaded from: input_file:kd/fi/v2/fah/utils/pipe/FahAsyncStreamPipe$SimplePipeConsumerThread.class */
    public class SimplePipeConsumerThread extends FahAsyncStreamPipe<E>.AbstractPipeConsumerThread<IFahIAsyncStreamDataBlock<E>> {
        protected SimplePipeConsumerThread(int i, BiConsumer<Integer, IFahIAsyncStreamDataBlock<E>> biConsumer) {
            super(i, biConsumer);
        }

        @Override // kd.fi.v2.fah.utils.pipe.FahAsyncStreamPipe.AbstractPipeConsumerThread
        protected int processQueueData(ConcurrentLinkedQueue<? extends IFahIAsyncStreamDataBlock<E>> concurrentLinkedQueue) {
            int i = 0;
            IFahIAsyncStreamDataBlock iFahIAsyncStreamDataBlock = null;
            try {
                IFahIAsyncStreamDataBlock<E> poll = concurrentLinkedQueue.poll();
                if (poll != null) {
                    FahAsyncStreamPipe.this.waitingDataBlockCnt.decrementAndGet();
                    FahAsyncStreamPipe.this.notifyWaitingDataBlockCntCntChanged();
                    if (poll.lock()) {
                        poll.onStageCompleted(CDCStageEnum.Started, true);
                        this.consumer.accept(Integer.valueOf(this.consumerIndex), poll);
                        poll.onStageCompleted(CDCStageEnum.Completed, true);
                        poll.unlock();
                    }
                    i = 0 + 1;
                } else {
                    FahAsyncStreamPipe.this.__waitDataQueueChange(FahAsyncStreamPipe.this.ConsumerOnEmptyQueue_SleepTime);
                }
            } catch (Exception e) {
                FahAsyncStreamPipe.this.logger.error(e.getMessage(), e);
                if (FahAsyncStreamPipe.this.exceptionListener != null) {
                    FahAsyncStreamPipe.this.exceptionListener.onError(e);
                }
                if (0 != 0) {
                    iFahIAsyncStreamDataBlock.onStageCompleted(CDCStageEnum.Completed, false);
                }
            }
            return i;
        }
    }

    public FahAsyncStreamPipe(int i, Function<FahAsyncStreamPipe<E>.SimplePipeConsumerThread, Future> function) {
        this.ConsumerOnEmptyQueue_SleepTime = 10L;
        this.EmptyQueue_ReCheckTimes = 4;
        this.maxConsumerCnt = 10;
        this.queuePreConsumerThreshold = 5;
        this.dataQueue = new ConcurrentLinkedQueue<>();
        this.maxConsumerCnt = i;
        this.pipeLock = new AtomicBoolean(true);
        this.pipeStatus = new AtomicInteger(0);
        this.consumerThreadRefs = new LinkedList();
        this.threadPoolFunc = function;
        if (function == null) {
            throw new IllegalArgumentException("Pipe required Thread Pool cannot be null!");
        }
        this.activeConsumerCnt = new AtomicInteger(0);
        this.waitingDataBlockCnt = new AtomicInteger(0);
    }

    public FahAsyncStreamPipe(int i, Function<FahAsyncStreamPipe<E>.SimplePipeConsumerThread, Future> function, Function<Object, BiConsumer<Integer, IFahIAsyncStreamDataBlock>> function2) {
        this(i, function);
        setConsumerSupplier(function2);
    }

    public FahAsyncStreamPipe(int i, Function<FahAsyncStreamPipe<E>.SimplePipeConsumerThread, Future> function, BiConsumer<Integer, IFahIAsyncStreamDataBlock<E>> biConsumer) {
        this(i, function);
        setDataConsumer(biConsumer);
    }

    public synchronized void startPipe() {
        if (this.pipeStatus.get() == 0 && this.consumerSupplier != null) {
            this.pipeStatus.set(1);
        }
        this.dataQueue.clear();
        this.consumerThreadRefs.clear();
        this.activeConsumerCnt.set(0);
        this.waitingDataBlockCnt.set(0);
        if (this.pipeStatus.get() == 1) {
            __changePipeLock(false);
            updatePipeStatus(2);
        }
    }

    protected BiConsumer<Integer, IFahIAsyncStreamDataBlock<E>> getConsumer() {
        BiConsumer<Integer, IFahIAsyncStreamDataBlock> apply = this.consumerSupplier.apply(this.pipeItemKey);
        if (apply == null) {
            throw new IllegalArgumentException(String.format("(Pipe:%s) Consumer from Supplier cannot be null!", this.pipeItemKey));
        }
        return (num, iFahIAsyncStreamDataBlock) -> {
            apply.accept(num, iFahIAsyncStreamDataBlock);
        };
    }

    protected synchronized boolean checkAndStartConsumerTask() {
        if (!needStartNewConsumer(this.activeConsumerCnt.get(), this.waitingDataBlockCnt.get(), this.queuePreConsumerThreshold)) {
            return false;
        }
        this.consumerThreadRefs.add(this.threadPoolFunc.apply(new SimplePipeConsumerThread(this.activeConsumerCnt.getAndIncrement(), getConsumer())));
        return true;
    }

    protected boolean needStartNewConsumer(int i, int i2, int i3) {
        if (i2 <= 0 || i >= this.maxConsumerCnt) {
            return false;
        }
        return i <= 0 || i * i3 < i2;
    }

    protected IFahIAsyncStreamDataBlock<E> putToQueue(IFahIAsyncStreamDataBlock<E> iFahIAsyncStreamDataBlock, boolean z) {
        if (this.pipeLock.get() && !z) {
            return null;
        }
        while (this.pipeLock.get()) {
            synchronized (this.pipeLock) {
                try {
                    this.pipeLock.wait(200L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        this.dataQueue.offer(iFahIAsyncStreamDataBlock);
        this.waitingDataBlockCnt.incrementAndGet();
        checkAndStartConsumerTask();
        notifyPipeQueueChanged();
        return iFahIAsyncStreamDataBlock;
    }

    protected IFahIAsyncStreamDataBlock<E> putToQueue(IFahIAsyncStreamDataBlock<E> iFahIAsyncStreamDataBlock) {
        return putToQueue((IFahIAsyncStreamDataBlock) iFahIAsyncStreamDataBlock, true);
    }

    public List<IFahIAsyncStreamDataBlock<E>> putToQueue(Object obj, Collection<E> collection, boolean z) {
        LinkedList linkedList = new LinkedList();
        Iterator<E> it = collection.iterator();
        while (it.hasNext()) {
            IFahIAsyncStreamDataBlock<E> putToQueue = putToQueue(obj, it.next(), z);
            if (putToQueue != null) {
                linkedList.add(putToQueue);
            }
        }
        return linkedList;
    }

    public List<IFahIAsyncStreamDataBlock<E>> putToQueue(Collection<E> collection, boolean z) {
        return putToQueue((Object) null, (Collection) collection, z);
    }

    public IFahIAsyncStreamDataBlock<E> putToQueue(Object obj, E e, boolean z) {
        return putToQueue((IFahIAsyncStreamDataBlock) new FahSimpleAsyncStreamDataBlock(obj, e), z);
    }

    public IFahIAsyncStreamDataBlock<E> putToQueue(E e, boolean z) {
        return putToQueue((Object) null, e, z);
    }

    public IFahIAsyncStreamDataBlock<E> putToQueue(Object obj, E e, int i, int i2, boolean z, boolean z2) {
        return putToQueue((IFahIAsyncStreamDataBlock) new FahSimpleAsyncStreamDataBlock(obj, e, i, i2, z), z2);
    }

    protected synchronized void clearDataQueue() {
        try {
            __changePipeLock(true);
            while (!this.dataQueue.isEmpty()) {
                IFahIAsyncStreamDataBlock<E> poll = this.dataQueue.poll();
                if (poll != null) {
                    poll.onStageCompleted(CDCStageEnum.Canceled, false);
                }
            }
            this.dataQueue.clear();
            notifyPipeQueueChanged();
            this.waitingDataBlockCnt.set(0);
        } finally {
            __changePipeLock(false);
            notifyWaitingDataBlockCntCntChanged();
        }
    }

    public void close(boolean z) {
        __changePipeLock(true);
        if (isRunning()) {
            if (z) {
                clearDataQueue();
            } else {
                notifyPipeQueueChanged();
                while (!this.dataQueue.isEmpty()) {
                    try {
                        synchronized (this.waitingDataBlockCnt) {
                            this.waitingDataBlockCnt.wait(500L);
                        }
                    } catch (Exception e) {
                        this.logger.error(e.getMessage(), e);
                    }
                }
            }
            updatePipeStatus(3);
            Iterator<Future> it = this.consumerThreadRefs.iterator();
            while (it.hasNext()) {
                try {
                    it.next().get();
                } catch (InterruptedException | ExecutionException e2) {
                }
            }
            updatePipeStatus(0);
        }
    }

    public void close() {
        close(false);
    }

    protected void updatePipeStatus(int i) {
        this.pipeStatus.set(i);
    }

    public boolean isStopped() {
        return this.pipeStatus.get() != 2;
    }

    public boolean isRunning() {
        return this.pipeStatus.get() == 2;
    }

    protected static void closeClosable(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (Exception e) {
            }
        }
    }

    public IFahIAsyncStreamDataBlock<E> peekFromQueue() {
        return this.dataQueue.peek();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getWaitingDataBlockCnt() {
        return this.waitingDataBlockCnt.get();
    }

    public int getActiveConsumerCnt() {
        return this.activeConsumerCnt.get();
    }

    protected void __notifyCounterValueChanged(AtomicInteger atomicInteger) {
        synchronized (atomicInteger) {
            atomicInteger.notifyAll();
        }
    }

    protected void notifyActiveConsumerCntChanged() {
        __notifyCounterValueChanged(this.activeConsumerCnt);
    }

    protected void notifyWaitingDataBlockCntCntChanged() {
        __notifyCounterValueChanged(this.waitingDataBlockCnt);
    }

    protected void notifyPipeQueueChanged() {
        synchronized (this.dataQueue) {
            this.dataQueue.notifyAll();
        }
    }

    protected void __waitDataQueueChange(long j) throws InterruptedException {
        synchronized (this.dataQueue) {
            this.dataQueue.wait(j);
        }
    }

    protected synchronized void __changePipeLock(boolean z) {
        this.pipeLock.set(z);
        synchronized (this.pipeLock) {
            this.pipeLock.notifyAll();
        }
    }

    public int queueSize() {
        return this.dataQueue.size();
    }

    public boolean isEmpty() {
        return this.dataQueue.isEmpty();
    }

    public IExceptionListener getExceptionListener() {
        return this.exceptionListener;
    }

    public void setExceptionListener(IExceptionListener iExceptionListener) {
        this.exceptionListener = iExceptionListener;
    }

    public Object getPipeItemKey() {
        return this.pipeItemKey;
    }

    public void setPipeItemKey(Object obj) {
        this.pipeItemKey = obj;
    }

    public List<Future> getConsumerThreadRefs() {
        return this.consumerThreadRefs;
    }

    public void setConsumerThreadRefs(List<Future> list) {
        this.consumerThreadRefs = list;
    }

    public Function<FahAsyncStreamPipe<E>.SimplePipeConsumerThread, Future> getThreadPoolFunc() {
        return this.threadPoolFunc;
    }

    public void setThreadPoolFunc(Function<FahAsyncStreamPipe<E>.SimplePipeConsumerThread, Future> function) {
        this.threadPoolFunc = function;
    }

    public int getPipeStatus() {
        return this.pipeStatus.get();
    }

    public Function<Object, BiConsumer<Integer, IFahIAsyncStreamDataBlock>> getConsumerSupplier() {
        return this.consumerSupplier;
    }

    public void setConsumerSupplier(Function<Object, BiConsumer<Integer, IFahIAsyncStreamDataBlock>> function) {
        this.consumerSupplier = function;
        __changePipeLock(function != null);
    }

    public void setDataConsumer(BiConsumer<Integer, IFahIAsyncStreamDataBlock<E>> biConsumer) {
        setConsumerSupplier(obj -> {
            return (num, iFahIAsyncStreamDataBlock) -> {
                biConsumer.accept(num, iFahIAsyncStreamDataBlock);
            };
        });
    }

    protected void waitForDataBlockCntNotify(int i) {
        while (getWaitingDataBlockCnt() > i) {
            try {
                synchronized (this.waitingDataBlockCnt) {
                    this.waitingDataBlockCnt.wait(500L);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void waitForDataBlockCnt(int i) {
        waitForDataBlockCntNotify(i);
    }

    public boolean waitQueueDataCompleted() {
        while (!isEmpty()) {
            waitForDataBlockCntNotify(0);
        }
        return true;
    }

    @Override // kd.fi.v2.fah.storage.IDataItemKey
    public Object getItemKey() {
        return this.pipeItemKey;
    }

    public int getMaxConsumerCnt() {
        return this.maxConsumerCnt;
    }

    public void setMaxConsumerCnt(int i) {
        this.maxConsumerCnt = i;
    }

    public int getQueuePreConsumerThreshold() {
        return this.queuePreConsumerThreshold;
    }

    public void setQueuePreConsumerThreshold(int i) {
        this.queuePreConsumerThreshold = i;
    }

    public long getConsumerOnEmptyQueue_SleepTime() {
        return this.ConsumerOnEmptyQueue_SleepTime;
    }

    public void setConsumerOnEmptyQueue_SleepTime(long j) {
        this.ConsumerOnEmptyQueue_SleepTime = j;
    }

    public int getEmptyQueue_ReCheckTimes() {
        return this.EmptyQueue_ReCheckTimes;
    }

    public void setEmptyQueue_ReCheckTimes(int i) {
        this.EmptyQueue_ReCheckTimes = i;
    }

    public ILogHandler getLogger() {
        return this.logger;
    }

    public void setLogger(ILogHandler iLogHandler) {
        this.logger = iLogHandler;
    }
}
