package kd.fi.bd.util.pipe;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.fi.bd.indexing.constant.CDCStageEnum;
import kd.fi.bd.threads.FIConfigurableThreadService;
import kd.fi.bd.threads.ThreadCategoryEnum;
import kd.fi.bd.util.exception.IExceptionListener;
import kd.fi.bd.util.pipe.datablock.IAsyncStreamDataBlock;
import kd.fi.bd.util.pipe.datablock.SimpleAsyncStreamDataBlock;

/* loaded from: input_file:kd/fi/bd/util/pipe/SimpleAsyncPipe.class */
public class SimpleAsyncPipe<E> {
    protected final Log logger;
    protected ThreadCategoryEnum threadCategoryEnum;
    protected BiConsumer<ThreadCategoryEnum, IAsyncStreamDataBlock<E>> dataConsumer;
    protected Consumer<Runnable> threadExecutor;
    protected IExceptionListener exceptionListener;

    public SimpleAsyncPipe(ThreadCategoryEnum threadCategoryEnum, BiConsumer<ThreadCategoryEnum, IAsyncStreamDataBlock<E>> biConsumer, Consumer<Runnable> consumer, Log log) {
        this.logger = log;
        if (log == null) {
            throw new IllegalArgumentException("Logger is null!");
        }
        this.threadCategoryEnum = threadCategoryEnum;
        if (threadCategoryEnum == null) {
            throw new IllegalArgumentException("Thread Category cannot be null!");
        }
        setDataConsumer(biConsumer);
        setThreadExecutor(consumer);
    }

    public SimpleAsyncPipe(ThreadCategoryEnum threadCategoryEnum, BiConsumer<ThreadCategoryEnum, IAsyncStreamDataBlock<E>> biConsumer) {
        this(threadCategoryEnum, biConsumer, runnable -> {
            FIConfigurableThreadService.getInstance().execute(threadCategoryEnum, runnable);
        }, LogFactory.getLog(SimpleAsyncPipe.class));
    }

    protected boolean doDataConsume(ThreadCategoryEnum threadCategoryEnum, IAsyncStreamDataBlock<E> iAsyncStreamDataBlock) {
        boolean z = false;
        try {
            try {
                this.dataConsumer.accept(threadCategoryEnum, iAsyncStreamDataBlock);
                z = true;
                iAsyncStreamDataBlock.onStageCompleted(CDCStageEnum.Completed, true);
            } catch (Exception e) {
                if (this.exceptionListener != null) {
                    this.exceptionListener.onError(e);
                }
                this.logger.error(e.getMessage(), e);
                iAsyncStreamDataBlock.onStageCompleted(CDCStageEnum.Completed, z);
            }
            return z;
        } catch (Throwable th) {
            iAsyncStreamDataBlock.onStageCompleted(CDCStageEnum.Completed, z);
            throw th;
        }
    }

    public List<IAsyncStreamDataBlock<E>> putToQueue(Object obj, Collection<E> collection) {
        LinkedList linkedList = new LinkedList();
        Iterator<E> it = collection.iterator();
        while (it.hasNext()) {
            linkedList.add(putToQueue(obj, it.next()));
        }
        return linkedList;
    }

    public List<IAsyncStreamDataBlock<E>> putToQueue(Collection<E> collection) {
        return putToQueue((Object) null, (Collection) collection);
    }

    public IAsyncStreamDataBlock<E> putToQueue(Object obj, E e) {
        return putToQueue((IAsyncStreamDataBlock) new SimpleAsyncStreamDataBlock(obj, e));
    }

    public IAsyncStreamDataBlock<E> putToQueue(E e) {
        return putToQueue((Object) null, e);
    }

    public IAsyncStreamDataBlock<E> putToQueue(Object obj, E e, int i, int i2, boolean z) {
        return putToQueue((IAsyncStreamDataBlock) new SimpleAsyncStreamDataBlock(obj, e, i, i2, z));
    }

    public IAsyncStreamDataBlock<E> putToQueue(IAsyncStreamDataBlock<E> iAsyncStreamDataBlock) {
        this.threadExecutor.accept(() -> {
            doDataConsume(this.threadCategoryEnum, iAsyncStreamDataBlock);
        });
        return iAsyncStreamDataBlock;
    }

    public ThreadCategoryEnum getThreadCategoryEnum() {
        return this.threadCategoryEnum;
    }

    public void setThreadCategoryEnum(ThreadCategoryEnum threadCategoryEnum) {
        this.threadCategoryEnum = threadCategoryEnum;
        if (threadCategoryEnum == null) {
            throw new IllegalArgumentException("Thread Category cannot be null!");
        }
    }

    public BiConsumer<ThreadCategoryEnum, IAsyncStreamDataBlock<E>> getDataConsumer() {
        return this.dataConsumer;
    }

    public Log getLogger() {
        return this.logger;
    }

    public void setDataConsumer(BiConsumer<ThreadCategoryEnum, IAsyncStreamDataBlock<E>> biConsumer) {
        this.dataConsumer = biConsumer;
        if (biConsumer == null) {
            throw new IllegalArgumentException("Data Block Consumer cannot be null!");
        }
    }

    public Consumer<Runnable> getThreadExecutor() {
        return this.threadExecutor;
    }

    public void setThreadExecutor(Consumer<Runnable> consumer) {
        this.threadExecutor = consumer;
        if (consumer == null) {
            throw new IllegalArgumentException("Thread Pool Provider cannot be null!");
        }
    }

    public IExceptionListener getExceptionListener() {
        return this.exceptionListener;
    }

    public void setExceptionListener(IExceptionListener iExceptionListener) {
        this.exceptionListener = iExceptionListener;
    }
}
