package kd.ai.gai.core.rag.service;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kd.ai.gai.core.Constant;
import kd.ai.gai.core.domain.dto.ChunkConfig;
import kd.ai.gai.core.domain.llm.EmbeddingParam;
import kd.ai.gai.core.enuz.EmbeddingModel;
import kd.ai.gai.core.enuz.LLM;
import kd.ai.gai.core.enuz.repo.ChunkRule;
import kd.ai.gai.core.enuz.repo.RepoSourceType;
import kd.ai.gai.core.rag.chunk.ChunkInput;
import kd.ai.gai.core.rag.chunk.ChunkServiceImpl;
import kd.ai.gai.core.rag.embedding.EmbeddingExeServiceFactory;
import kd.bos.context.OperationContext;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.db.DB;
import kd.bos.db.DBRoute;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.thread.ThreadLifeCycleManager;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.WaitingRejectedHandler;
import kd.bos.threads.impl.ThreadPoolImpl;

/* loaded from: input_file:kd/ai/gai/core/rag/service/RepoDispatchService.class */
public class RepoDispatchService {
    private static final int POOL_MAX_SIZE = 10;
    private static final int POOL_CORE_SIZE = 3;
    private static final long KEEP_ALIVE_LIVE = 5;
    private static final int MAX_FIX_QUEUESIZE = 1000000;
    private static final String run_msg = "running";
    private static Log LOGGER = LogFactory.getLog(RepoDispatchService.class);
    private static final String CHUNK_POOL_NAME = "ai-gai-pool-repo-chunk";
    private static ExecutorService CHUNK_ES = createFixedThreadPool(CHUNK_POOL_NAME);
    private static final String EMBEDDING_POOL_NAME = "ai-gai-pool-repo-embedding";
    private static ExecutorService EMBEDDING_ES = createFixedThreadPool(EMBEDDING_POOL_NAME);
    private static ThreadPool REPO_CHUNK_POOL = new ThreadPoolImpl(CHUNK_ES, (OperationContext) null, bool -> {
    });
    private static ThreadPool REPO_EMBEDDING_POOL = new ThreadPoolImpl(EMBEDDING_ES, (OperationContext) null, bool -> {
    });

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/ai/gai/core/rag/service/RepoDispatchService$ChunkTask.class */
    public static class ChunkTask implements Runnable {
        private final ChunkInput chunkInput;

        public ChunkTask(ChunkInput chunkInput) {
            this.chunkInput = chunkInput;
        }

        @Override // java.lang.Runnable
        public void run() {
            new ChunkServiceImpl().execute(this.chunkInput);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/ai/gai/core/rag/service/RepoDispatchService$EmbeddingTask.class */
    public static class EmbeddingTask implements Runnable {
        private final LLM llm;
        private final EmbeddingParam embeddingParam;

        public EmbeddingTask(LLM llm, EmbeddingParam embeddingParam) {
            this.llm = llm;
            this.embeddingParam = embeddingParam;
        }

        @Override // java.lang.Runnable
        public void run() {
            EmbeddingExeServiceFactory.getService(EmbeddingModel.parse(this.llm)).execute(this.embeddingParam);
        }
    }

    private static ExecutorService createFixedThreadPool(final String str) {
        return ThreadLifeCycleManager.wrapExecutorService(new ThreadPoolExecutor(POOL_CORE_SIZE, POOL_MAX_SIZE, KEEP_ALIVE_LIVE, TimeUnit.MINUTES, new LinkedBlockingQueue(MAX_FIX_QUEUESIZE), new ThreadFactory() { // from class: kd.ai.gai.core.rag.service.RepoDispatchService.1
            private AtomicInteger atomicInteger = new AtomicInteger(0);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, str + "-" + this.atomicInteger.incrementAndGet());
            }
        }, new WaitingRejectedHandler()));
    }

    /* JADX WARN: Removed duplicated region for block: B:54:0x021c A[Catch: Exception -> 0x02b8, TryCatch #0 {Exception -> 0x02b8, blocks: (B:3:0x000a, B:5:0x001b, B:6:0x004c, B:7:0x006c, B:10:0x007c, B:13:0x008c, B:16:0x009c, B:20:0x00ab, B:21:0x00c8, B:23:0x00d0, B:25:0x00d8, B:26:0x02ab, B:28:0x0100, B:29:0x0111, B:30:0x0130, B:32:0x013a, B:33:0x0168, B:34:0x0198, B:37:0x01a8, B:40:0x01b8, B:43:0x01c8, B:46:0x01d8, B:49:0x01e8, B:53:0x01f7, B:54:0x021c, B:57:0x022a, B:59:0x0239, B:61:0x0244, B:65:0x025a, B:66:0x028f), top: B:2:0x000a }] */
    /* JADX WARN: Removed duplicated region for block: B:57:0x022a A[Catch: Exception -> 0x02b8, TryCatch #0 {Exception -> 0x02b8, blocks: (B:3:0x000a, B:5:0x001b, B:6:0x004c, B:7:0x006c, B:10:0x007c, B:13:0x008c, B:16:0x009c, B:20:0x00ab, B:21:0x00c8, B:23:0x00d0, B:25:0x00d8, B:26:0x02ab, B:28:0x0100, B:29:0x0111, B:30:0x0130, B:32:0x013a, B:33:0x0168, B:34:0x0198, B:37:0x01a8, B:40:0x01b8, B:43:0x01c8, B:46:0x01d8, B:49:0x01e8, B:53:0x01f7, B:54:0x021c, B:57:0x022a, B:59:0x0239, B:61:0x0244, B:65:0x025a, B:66:0x028f), top: B:2:0x000a }] */
    /* JADX WARN: Removed duplicated region for block: B:59:0x0239 A[Catch: Exception -> 0x02b8, TryCatch #0 {Exception -> 0x02b8, blocks: (B:3:0x000a, B:5:0x001b, B:6:0x004c, B:7:0x006c, B:10:0x007c, B:13:0x008c, B:16:0x009c, B:20:0x00ab, B:21:0x00c8, B:23:0x00d0, B:25:0x00d8, B:26:0x02ab, B:28:0x0100, B:29:0x0111, B:30:0x0130, B:32:0x013a, B:33:0x0168, B:34:0x0198, B:37:0x01a8, B:40:0x01b8, B:43:0x01c8, B:46:0x01d8, B:49:0x01e8, B:53:0x01f7, B:54:0x021c, B:57:0x022a, B:59:0x0239, B:61:0x0244, B:65:0x025a, B:66:0x028f), top: B:2:0x000a }] */
    /* JADX WARN: Removed duplicated region for block: B:61:0x0244 A[Catch: Exception -> 0x02b8, TryCatch #0 {Exception -> 0x02b8, blocks: (B:3:0x000a, B:5:0x001b, B:6:0x004c, B:7:0x006c, B:10:0x007c, B:13:0x008c, B:16:0x009c, B:20:0x00ab, B:21:0x00c8, B:23:0x00d0, B:25:0x00d8, B:26:0x02ab, B:28:0x0100, B:29:0x0111, B:30:0x0130, B:32:0x013a, B:33:0x0168, B:34:0x0198, B:37:0x01a8, B:40:0x01b8, B:43:0x01c8, B:46:0x01d8, B:49:0x01e8, B:53:0x01f7, B:54:0x021c, B:57:0x022a, B:59:0x0239, B:61:0x0244, B:65:0x025a, B:66:0x028f), top: B:2:0x000a }] */
    /* JADX WARN: Removed duplicated region for block: B:63:0x0257 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public static boolean dispatchTask(long r8) {
        /*
            Method dump skipped, instructions count: 716
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: kd.ai.gai.core.rag.service.RepoDispatchService.dispatchTask(long):boolean");
    }

    private static void excEmbeddingTask(long j, LLM llm, long j2) {
        DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_repo_doc_manage set ffilestatus=?  where fentryid=?", new Object[]{Constant.RepoInfo.file_status_running_embedding, Long.valueOf(j2)});
        addEmbeddingTask(j, llm, j2);
    }

    private static void clearOldChunkAddNew(long j, String str, LLM llm, DynamicObject dynamicObject, long j2) {
        DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_repo_doc_manage set ffilestatus=?  where fentryid=?", new Object[]{"A", Long.valueOf(j2)});
        DB.update(DBRoute.of(Constant.DB_KEY), "delete t_gai_text_chunk where ffileid =?", new Object[]{Long.valueOf(j2)});
        addChunkTask(j, str, llm, dynamicObject);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean addChunkTask(long j, String str, LLM llm, DynamicObject dynamicObject) {
        String string = dynamicObject.getString(Constant.RepoInfo.file_path);
        String string2 = dynamicObject.getString(Constant.RepoInfo.file_type);
        String string3 = dynamicObject.getString(Constant.RepoInfo.file_source);
        long longValue = ((Long) dynamicObject.getPkValue()).longValue();
        try {
            LOGGER.info(String.format("【GPT-知识库】加入分块任务-开始 repoId: %s,fileId:%s", Long.valueOf(j), Long.valueOf(longValue)));
            REPO_CHUNK_POOL.execute(new ChunkTask(new ChunkInput(j, longValue, string, string2, new ChunkConfig(ChunkRule.parse(llm, ChunkRule.ChunkStrategy.parse(str)), RepoSourceType.parse(string3, string2)))), RequestContext.get());
            LOGGER.info(String.format("【GPT-知识库】加入分块任务-完成 repoId: %s,fileId:%s", Long.valueOf(j), Long.valueOf(longValue)));
            return true;
        } catch (Exception e) {
            LOGGER.error("【知识库-分块】任务启动失败 {} 执行器，异常信息：{}", new Object[]{EMBEDDING_POOL_NAME, e.getMessage(), e});
            DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_repo_info set fstatus=? where fid = ?", new Object[]{"D", Long.valueOf(j)});
            DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_repo_doc_manage set ffilestatus=?  where fentryid=?", new Object[]{"D", Long.valueOf(longValue)});
            return false;
        }
    }

    public static boolean addEmbeddingTask(long j, LLM llm, long j2) {
        try {
            LOGGER.info(String.format("【知识库-Embedding】启动任务 repoId:%s,fileId:%s", Long.valueOf(j), Long.valueOf(j2)));
            REPO_EMBEDDING_POOL.execute(new EmbeddingTask(llm, new EmbeddingParam(j, j2)), RequestContext.get());
            return true;
        } catch (Exception e) {
            LOGGER.error("【知识库-Embedding】任务启动失败 {} 执行器，异常信息：{}", new Object[]{EMBEDDING_POOL_NAME, e.getMessage(), e});
            DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_repo_info set fstatus=? where fid = ?", new Object[]{"D", Long.valueOf(j)});
            DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_repo_doc_manage set ffilestatus=?  where fentryid=?", new Object[]{Constant.RepoInfo.file_status_err_embedding, Long.valueOf(j2)});
            return false;
        }
    }
}
