package kd.bos.dts.consume.impl.storageQueue;

import kd.bos.dts.Constant;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;
import kd.bos.util.async.SetQueue;

/* loaded from: input_file:kd/bos/dts/consume/impl/storageQueue/QueueConsumeExecutor.class */
public class QueueConsumeExecutor {
    private static SetQueue<DtsDataConsumerAsyncTask> taskQueue = new SetQueue<>();
    private static ThreadPool pool = ThreadPools.newCachedThreadPool("DtsDataConsumerAsyncThread", 2, Integer.getInteger(Constant.DTS_CONSUMER_FASTMODE_MAXTHREAD, 8).intValue());

    public static void addTask(DtsDataConsumerAsyncTask dtsDataConsumerAsyncTask) {
        taskQueue.putIfAbsent(dtsDataConsumerAsyncTask);
    }

    private static boolean isStart() {
        return true;
    }

    static {
        new Thread(() -> {
            while (isStart()) {
                DtsDataConsumerAsyncTask dtsDataConsumerAsyncTask = (DtsDataConsumerAsyncTask) taskQueue.poll();
                if (dtsDataConsumerAsyncTask != null) {
                    pool.execute(dtsDataConsumerAsyncTask);
                }
            }
        }, "DispatcherDtsDataConsumerThread").start();
    }
}
