package kd.ai.aicc.core;

import java.util.concurrent.atomic.AtomicBoolean;
import kd.ai.aicc.core.client.SyncTaskLockManager;
import kd.ai.aicc.core.domain.Task;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/ai/aicc/core/Dispatcher.class */
public class Dispatcher implements Runnable {
    private Queue queue;
    private Executor executor;
    private static final Log log = LogFactory.getLog(Dispatcher.class);
    private static Dispatcher instance = null;
    public static final String DISPATCHER_THREAD_POOL_NAME = "ai-aicc-dispatcher-threadpool";
    private static final ThreadPool threadPool = ThreadPools.newFixedThreadPool(DISPATCHER_THREAD_POOL_NAME, 1, Constant.APP_ID);
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Object notifyObject = new Object();

    private Dispatcher() {
    }

    public static synchronized Dispatcher getInstance() {
        if (instance == null) {
            instance = new Dispatcher();
            instance.init();
        }
        return instance;
    }

    private void init() {
        DataManager.getInstance().init();
        this.queue = new Queue();
        this.queue.init();
        this.executor = new Executor();
        this.executor.init();
        this.isRunning.set(true);
        threadPool.execute(this);
        log.info("初始化完成");
    }

    public void start() {
        this.isRunning.set(true);
        log.info("服务启动");
    }

    public void stop() {
        log.warn("收到服务停止信号");
        this.isRunning.set(false);
    }

    public boolean isStarted() {
        return this.isRunning.get();
    }

    public void newSyncTask(Task task) {
        newASyncTask(task);
    }

    public void newASyncTask(Task task) {
        DataManager.getInstance().setTaskRequestContext(task);
        this.queue.enqueue(task);
        synchronized (this.notifyObject) {
            this.notifyObject.notifyAll();
        }
    }

    public void taskCompleted(Task task) {
        synchronized (this.notifyObject) {
            this.notifyObject.notifyAll();
        }
        SyncTaskLockManager.release(task.getId());
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("任务分发器开始运行");
        while (this.isRunning.get()) {
            try {
                running();
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
            if (this.isRunning.get()) {
                synchronized (this.notifyObject) {
                    try {
                        this.notifyObject.wait(5000L);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        log.error(e2.getMessage(), e2);
                    }
                }
            }
        }
        log.info("任务分发器结束运行");
    }

    private void running() {
        Task next = this.queue.next();
        while (true) {
            Task task = next;
            if (!this.isRunning.get() || task == null) {
                return;
            }
            try {
                this.executor.executeTask(task, task.getInstance());
            } catch (Exception e) {
                task.releaseLock();
                task.getInstance().releaseResource();
                log.error(e);
            }
            next = this.queue.next();
        }
    }
}
