package kd.fi.bd.util.pipe;

import java.io.Closeable;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;
import kd.fi.bd.indexing.constant.CDCStageEnum;
import kd.fi.bd.util.exception.IExceptionListener;
import kd.fi.bd.util.pipe.datablock.IAsyncStreamDataBlock;
import kd.fi.bd.util.pipe.datablock.SimpleAsyncStreamDataBlock;

/* loaded from: input_file:kd/fi/bd/util/pipe/AsyncStreamPipe.class */
public class AsyncStreamPipe<E> {
    private static final ThreadPool CONSUMER_POOL = ThreadPools.newCachedThreadPool(AsyncStreamPipe.class.getName(), 1, 10);
    private static final Log logger = LogFactory.getLog(AsyncStreamPipe.class);
    public static final int PIPE_NOT_RUNNING = 0;
    public static final int PIPE_STARTING = 1;
    public static final int PIPE_RUNNING = 2;
    public static final int PIPE_STOPPING = 3;
    protected ConcurrentLinkedQueue<IAsyncStreamDataBlock<E>> dataQueue;
    protected BiConsumer<Integer, IAsyncStreamDataBlock<E>> dataConsumer;
    protected AtomicInteger pipeStatus;
    protected AtomicInteger activeConsumerCnt;
    protected AtomicInteger waitingDataBlockCnt;
    protected IExceptionListener exceptionListener;
    protected int consumerThreadCnt;
    protected List<Future> consumerThreadRefs;
    protected Function<AsyncStreamPipe<E>.PipeConsumerTask, Future> threadPoolFunc;

    /* loaded from: input_file:kd/fi/bd/util/pipe/AsyncStreamPipe$PipeConsumerTask.class */
    public class PipeConsumerTask implements Callable<Integer> {
        final int consumerIndex;
        final BiConsumer<Integer, IAsyncStreamDataBlock<E>> consumer;

        public PipeConsumerTask(int i, BiConsumer<Integer, IAsyncStreamDataBlock<E>> biConsumer) {
            this.consumerIndex = i;
            this.consumer = biConsumer;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws Exception {
            return Integer.valueOf(AsyncStreamPipe.this.doDataConsume(this.consumerIndex, this.consumer));
        }
    }

    public AsyncStreamPipe(int i, Function<AsyncStreamPipe<E>.PipeConsumerTask, Future> function) {
        this.dataQueue = new ConcurrentLinkedQueue<>();
        this.pipeStatus = new AtomicInteger(0);
        this.activeConsumerCnt = new AtomicInteger(0);
        this.consumerThreadCnt = i;
        this.consumerThreadRefs = new LinkedList();
        this.waitingDataBlockCnt = new AtomicInteger(0);
        this.threadPoolFunc = function;
        if (function == null) {
            throw new IllegalArgumentException("Pipe required Thread Pool cannot be null!");
        }
    }

    /* JADX WARN: Illegal instructions before constructor call */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public AsyncStreamPipe(int r6) {
        /*
            r5 = this;
            r0 = r5
            r1 = r6
            kd.bos.threads.ThreadPool r2 = kd.fi.bd.util.pipe.AsyncStreamPipe.CONSUMER_POOL
            r3 = r2
            java.lang.Class r3 = r3.getClass()
            void r2 = (v1) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                return r2.submit(v1);
            }
            r0.<init>(r1, r2)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: kd.fi.bd.util.pipe.AsyncStreamPipe.<init>(int):void");
    }

    public AsyncStreamPipe() {
        this(1);
    }

    public AsyncStreamPipe(int i, Function<AsyncStreamPipe<E>.PipeConsumerTask, Future> function, BiConsumer<Integer, IAsyncStreamDataBlock<E>> biConsumer) {
        this(i, function);
        attach(biConsumer);
    }

    public void attach(BiConsumer<Integer, IAsyncStreamDataBlock<E>> biConsumer) {
        if (isRunning()) {
            close();
        }
        updatePipeStatus(1);
        this.dataConsumer = biConsumer;
        for (int i = 0; i < this.consumerThreadCnt; i++) {
            this.consumerThreadRefs.add(this.threadPoolFunc.apply(new PipeConsumerTask(i, biConsumer)));
        }
        updatePipeStatus(2);
    }

    public void restart(boolean z) {
        if (z) {
            clearDataQueue();
        }
        attach(this.dataConsumer);
    }

    protected void clearDataQueue() {
        while (!this.dataQueue.isEmpty()) {
            IAsyncStreamDataBlock<E> poll = this.dataQueue.poll();
            if (poll != null) {
                poll.onStageCompleted(CDCStageEnum.Canceled, false);
            }
        }
        this.dataQueue.clear();
        this.waitingDataBlockCnt.set(0);
    }

    protected void updatePipeStatus(int i) {
        this.pipeStatus.set(i);
    }

    public boolean isStopped() {
        int i = this.pipeStatus.get();
        return i == 0 || i == 3;
    }

    public boolean isRunning() {
        int i = this.pipeStatus.get();
        return i == 2 || i == 1;
    }

    protected int doDataConsume(int i, BiConsumer<Integer, IAsyncStreamDataBlock<E>> biConsumer) {
        IAsyncStreamDataBlock<E> poll;
        this.activeConsumerCnt.incrementAndGet();
        synchronized (this.activeConsumerCnt) {
            this.activeConsumerCnt.notifyAll();
        }
        IAsyncStreamDataBlock<E> iAsyncStreamDataBlock = null;
        while (isRunning()) {
            try {
                poll = this.dataQueue.poll();
                iAsyncStreamDataBlock = poll;
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                if (this.exceptionListener != null) {
                    this.exceptionListener.onError(e);
                }
                if (iAsyncStreamDataBlock != null) {
                    iAsyncStreamDataBlock.onStageCompleted(CDCStageEnum.Completed, false);
                }
            }
            if (poll != null) {
                this.waitingDataBlockCnt.decrementAndGet();
                if (this.dataQueue.isEmpty()) {
                    synchronized (this.dataQueue) {
                        this.dataQueue.notifyAll();
                    }
                }
                biConsumer.accept(Integer.valueOf(i), iAsyncStreamDataBlock);
                iAsyncStreamDataBlock.onStageCompleted(CDCStageEnum.Completed, true);
            } else {
                synchronized (this.dataQueue) {
                    this.dataQueue.wait(500L);
                }
            }
        }
        int decrementAndGet = this.activeConsumerCnt.decrementAndGet();
        synchronized (this.activeConsumerCnt) {
            this.activeConsumerCnt.notifyAll();
        }
        return decrementAndGet;
    }

    protected static void closeClosable(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (Exception e) {
            }
        }
    }

    public void close(boolean z) {
        if (isRunning()) {
            if (z) {
                clearDataQueue();
            } else {
                while (!this.dataQueue.isEmpty()) {
                    try {
                        synchronized (this.dataQueue) {
                            this.dataQueue.wait(500L);
                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            updatePipeStatus(3);
            synchronized (this.dataQueue) {
                this.dataQueue.notifyAll();
            }
            Iterator<Future> it = this.consumerThreadRefs.iterator();
            while (it.hasNext()) {
                try {
                    it.next().get();
                } catch (InterruptedException | ExecutionException e2) {
                }
            }
            updatePipeStatus(0);
        }
    }

    public void close() {
        close(false);
    }

    public List<IAsyncStreamDataBlock<E>> putToQueue(Object obj, Collection<E> collection) {
        LinkedList linkedList = new LinkedList();
        Iterator<E> it = collection.iterator();
        while (it.hasNext()) {
            linkedList.add(putToQueue(obj, it.next()));
        }
        return linkedList;
    }

    public List<IAsyncStreamDataBlock<E>> putToQueue(Collection<E> collection) {
        return putToQueue((Object) null, (Collection) collection);
    }

    public IAsyncStreamDataBlock<E> putToQueue(Object obj, E e) {
        return putToQueue((IAsyncStreamDataBlock) new SimpleAsyncStreamDataBlock(obj, e));
    }

    public IAsyncStreamDataBlock<E> putToQueue(E e) {
        return putToQueue((Object) null, e);
    }

    public IAsyncStreamDataBlock<E> putToQueue(Object obj, E e, int i, int i2, boolean z) {
        return putToQueue((IAsyncStreamDataBlock) new SimpleAsyncStreamDataBlock(obj, e, i, i2, z));
    }

    public IAsyncStreamDataBlock<E> putToQueue(IAsyncStreamDataBlock<E> iAsyncStreamDataBlock) {
        this.dataQueue.offer(iAsyncStreamDataBlock);
        this.waitingDataBlockCnt.incrementAndGet();
        synchronized (this.dataQueue) {
            this.dataQueue.notifyAll();
        }
        return iAsyncStreamDataBlock;
    }

    public IAsyncStreamDataBlock<E> peekFromQueue() {
        return this.dataQueue.peek();
    }

    public int queueSize() {
        return this.dataQueue.size();
    }

    public boolean isEmpty() {
        return this.dataQueue.isEmpty();
    }

    public IExceptionListener getExceptionListener() {
        return this.exceptionListener;
    }

    public void setExceptionListener(IExceptionListener iExceptionListener) {
        this.exceptionListener = iExceptionListener;
    }

    public BiConsumer<Integer, IAsyncStreamDataBlock<E>> getDataConsumer() {
        return this.dataConsumer;
    }

    public int getConsumerThreadCnt() {
        return this.consumerThreadCnt;
    }

    public void setConsumerThreadCnt(int i) {
        this.consumerThreadCnt = i;
    }

    public int getActiveConsumerCnt() {
        return this.activeConsumerCnt.get();
    }

    public void setDataConsumer(BiConsumer<Integer, IAsyncStreamDataBlock<E>> biConsumer) {
        this.dataConsumer = biConsumer;
    }

    public int getPipeStatus() {
        return this.pipeStatus.get();
    }

    public int getWaitingDataBlockCnt() {
        return this.waitingDataBlockCnt.get();
    }

    public void waitForDataBlockCnt(int i) {
        while (this.waitingDataBlockCnt.get() > i) {
            try {
                synchronized (this.dataQueue) {
                    this.dataQueue.wait(500L);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void waitForCompleted() {
        while (this.waitingDataBlockCnt.get() > 0) {
            try {
                synchronized (this.dataQueue) {
                    this.dataQueue.wait(500L);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
