package kd.fi.v2.fah.utils.pipe;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;
import kd.fi.bd.model.common.PairTuple;
import kd.fi.bd.util.pipe.datablock.IAsyncStreamDataBlock;

/* loaded from: input_file:kd/fi/v2/fah/utils/pipe/PipePoolFactory.class */
public class PipePoolFactory {
    private static PipePoolFactory instance = new PipePoolFactory();
    private Map<Object, PairTuple<FahAsyncStreamPipe, ThreadPool>> pipeStorage;

    public static PipePoolFactory getInstance() {
        return instance;
    }

    protected PipePoolFactory() {
    }

    protected ThreadPool createThreadPoolFunc(String str, int i, int i2) {
        return ThreadPools.newCachedThreadPool(str, i, i2);
    }

    protected FahAsyncStreamPipe registerPipeInstance(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum) {
        if (fahRegisteredAsyncPipeEnum == null) {
            return null;
        }
        if (this.pipeStorage == null) {
            this.pipeStorage = new HashMap(4);
        }
        ThreadPool createThreadPoolFunc = createThreadPoolFunc(fahRegisteredAsyncPipeEnum.name(), 0, fahRegisteredAsyncPipeEnum.getDefaultMaxThreadCnt());
        FahAsyncStreamPipe fahAsyncStreamPipe = new FahAsyncStreamPipe(fahRegisteredAsyncPipeEnum.getDefaultMaxThreadCnt(), simplePipeConsumerThread -> {
            return createThreadPoolFunc.submit(simplePipeConsumerThread);
        });
        this.pipeStorage.put(fahRegisteredAsyncPipeEnum.name(), new PairTuple<>(fahAsyncStreamPipe, createThreadPoolFunc));
        return fahAsyncStreamPipe;
    }

    public void stopAndRemove(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum, boolean z) {
        if (hasPipeInstance(fahRegisteredAsyncPipeEnum)) {
            PairTuple<FahAsyncStreamPipe, ThreadPool> remove = this.pipeStorage.remove(fahRegisteredAsyncPipeEnum.name());
            ((FahAsyncStreamPipe) remove.getKey()).close(z);
            ((ThreadPool) remove.getValue()).close();
        }
    }

    protected boolean hasPipeInstance(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum) {
        return this.pipeStorage != null && this.pipeStorage.containsKey(fahRegisteredAsyncPipeEnum);
    }

    protected <T> T accessPipeInstance(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum, Function<FahAsyncStreamPipe, T> function) {
        FahAsyncStreamPipe pipe;
        if (fahRegisteredAsyncPipeEnum == null) {
            return null;
        }
        if (hasPipeInstance(fahRegisteredAsyncPipeEnum)) {
            pipe = getPipe(fahRegisteredAsyncPipeEnum);
        } else {
            FahAsyncStreamPipe registerPipeInstance = registerPipeInstance(fahRegisteredAsyncPipeEnum);
            pipe = registerPipeInstance;
            if (registerPipeInstance == null) {
                throw new IllegalArgumentException(String.format("Failed on Register Pipe Type [%s]!", fahRegisteredAsyncPipeEnum));
            }
        }
        return function.apply(pipe);
    }

    public <E> FahAsyncStreamPipe<E> getPipe(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum) {
        if (hasPipeInstance(fahRegisteredAsyncPipeEnum)) {
            return (FahAsyncStreamPipe) this.pipeStorage.get(fahRegisteredAsyncPipeEnum.name()).getKey();
        }
        return null;
    }

    public <E> IAsyncStreamDataBlock<E> putToQueue(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum, E e, boolean z) {
        return (IAsyncStreamDataBlock) accessPipeInstance(fahRegisteredAsyncPipeEnum, fahAsyncStreamPipe -> {
            return fahAsyncStreamPipe.putToQueue((FahAsyncStreamPipe) e, z);
        });
    }

    public void waitQueueDataCompleted(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum) {
        accessPipeInstance(fahRegisteredAsyncPipeEnum, fahAsyncStreamPipe -> {
            return Boolean.valueOf(fahAsyncStreamPipe.waitQueueDataCompleted());
        });
    }

    public int getWaitingDataBlockCnt(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum) {
        return ((Integer) accessPipeInstance(fahRegisteredAsyncPipeEnum, fahAsyncStreamPipe -> {
            return Integer.valueOf(fahAsyncStreamPipe.getWaitingDataBlockCnt());
        })).intValue();
    }

    public int getActiveConsumerCnt(FahRegisteredAsyncPipeEnum fahRegisteredAsyncPipeEnum) {
        return ((Integer) accessPipeInstance(fahRegisteredAsyncPipeEnum, fahAsyncStreamPipe -> {
            return Integer.valueOf(fahAsyncStreamPipe.getActiveConsumerCnt());
        })).intValue();
    }
}
