package kd.ai.gai.core.engine.handler;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import kd.ai.gai.core.Constant;
import kd.ai.gai.core.domain.dto.Chunk;
import kd.ai.gai.core.domain.llm.base.Result2User;
import kd.ai.gai.core.engine.EngineCache;
import kd.ai.gai.core.engine.Errors;
import kd.ai.gai.core.engine.json.JsonUtil;
import kd.ai.gai.core.engine.message.EmbeddingMessage;
import kd.ai.gai.core.rag.milvus.MilvusService;
import kd.ai.gai.core.service.RepoService;
import kd.ai.gai.core.service.embedding.EmbeddingService;
import kd.ai.gai.core.service.embedding.EmbeddingServiceFactory;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.dataentity.serialization.SerializationUtils;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.db.DB;
import kd.bos.db.DBRoute;
import kd.bos.dlock.DLock;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.orm.query.QFilter;
import kd.bos.servicehelper.QueryServiceHelper;

/* loaded from: input_file:kd/ai/gai/core/engine/handler/EmbeddingHandler.class */
public class EmbeddingHandler {
    private static final Log logger = LogFactory.getLog(EmbeddingHandler.class);
    private static final long LOCK_WAIT_TIME = 10000;
    private static final String LOCK_KEY_PER = "gai_embedding_call_";

    /* JADX WARN: Finally extract failed */
    public static void callback(Result2User result2User) {
        String taskId = result2User.getTaskId();
        try {
            if (!Errors.OK.getCode().equals(result2User.getCode())) {
                embeddingErrUpdate(result2User, "【知识库-Embedding】异步调用结果异常");
                return;
            }
            String str = (String) EngineCache.getAppCache(taskId).get(Constant.TaskCache.EMBEDDING_TASK_PRE + taskId, String.class);
            logger.info("【知识库-Embedding】embedding异步调用回调缓存获取，taskId({}),content：{}", taskId, str);
            if (StringUtils.isEmpty(str)) {
                return;
            }
            EmbeddingMessage embeddingMessage = (EmbeddingMessage) JsonUtil.fromJson(str, EmbeddingMessage.class);
            EmbeddingService executor = EmbeddingServiceFactory.getExecutor(embeddingMessage.getLlm());
            long repoId = embeddingMessage.getRepoId();
            long fileId = embeddingMessage.getFileId();
            embeddingMessage.setVectors(executor.resultParse(result2User).getVectorList());
            List<Long> chunkIds = embeddingMessage.getChunkIds();
            List<List<Float>> vectors = embeddingMessage.getVectors();
            ArrayList arrayList = new ArrayList(chunkIds.size());
            for (int i = 0; i < chunkIds.size(); i++) {
                arrayList.add(new Chunk(chunkIds.get(i).longValue(), repoId, vectors.get(i)));
            }
            logger.info("【知识库-Embedding】向量数据,任务批次:{} ,大小:{} ,开始写入", taskId, Integer.valueOf(chunkIds.size()));
            MilvusService.getExecutor(embeddingMessage.getLlm()).bachInsert(arrayList);
            Object[] objArr = new Object[chunkIds.size() + 1];
            objArr[0] = "C";
            for (int i2 = 0; i2 < chunkIds.size(); i2++) {
                objArr[i2 + 1] = chunkIds.get(i2);
            }
            DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_text_chunk set fstatus = ? where fid in (" + String.join(",", Collections.nCopies(arrayList.size(), "?")) + ')', objArr);
            logger.info("【知识库-Embedding】向量数据,任务批次:{} 写入成功", taskId);
            RequestContext requestContext = RequestContext.get();
            String concat = LOCK_KEY_PER.concat(requestContext.getTenantCode()).concat("_").concat(requestContext.getAccountId()).concat("_").concat(String.valueOf(fileId));
            logger.info(String.format("【GPT-知识库】-文件embedding异步回调任务, 锁Key：%s", concat));
            DLock lock = getLock(concat);
            if (!lock.tryLock(10000L)) {
                logger.error(String.format("【知识库-Embedding】embedding异步回调任务,获任务更新权超时，中断本次任务，知识库:%s, FileID: %s ", Long.valueOf(repoId), Long.valueOf(fileId)));
            }
            try {
                try {
                    RepoService.updateRepoDocStatus(repoId, fileId);
                    logger.info("【知识库-Embedding】embedding异步回调任务,文件:【{}】，任务批次：【{}】完成, 释放锁：{}", new Object[]{Long.valueOf(fileId), taskId, concat});
                    lock.unlock();
                } catch (Exception e) {
                    throw e;
                }
            } catch (Throwable th) {
                logger.info("【知识库-Embedding】embedding异步回调任务,文件:【{}】，任务批次：【{}】完成, 释放锁：{}", new Object[]{Long.valueOf(fileId), taskId, concat});
                lock.unlock();
                throw th;
            }
        } catch (Exception e2) {
            String str2 = "【知识库-Embedding】callback异常" + e2.getMessage();
            logger.error(str2, e2);
            embeddingErrUpdate(result2User, str2);
        }
    }

    private static void embeddingErrUpdate(Result2User result2User, String str) {
        String taskId = result2User.getTaskId();
        DynamicObject queryOne = QueryServiceHelper.queryOne(Constant.GaiTextChunk.form_id, String.join(",", "repoid", Constant.GaiTextChunk.file_id), new QFilter[]{new QFilter(Constant.GaiTextChunk.task_id, "=", taskId)});
        long j = queryOne.getLong("repoid");
        long j2 = queryOne.getLong(Constant.GaiTextChunk.file_id);
        String jsonString = SerializationUtils.toJsonString(result2User);
        String format = String.format("traceId: %s ,errMsg:%s - %s", RequestContext.get().getTraceId(), str, jsonString);
        String substring = format.length() > 2000 ? format.substring(0, 2000) : format;
        logger.error("【知识库-Embedding】模型AICC回调,异常: {}, repoId:{},fileId:{},结果:{}", new Object[]{str, Long.valueOf(j), Long.valueOf(j2), jsonString});
        DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_repo_info set fstatus=?,fmodifytime=? where fid = ?", new Object[]{"D", new Timestamp(System.currentTimeMillis()), Long.valueOf(j)});
        DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_text_chunk set fstatus=?  where ftaskid =? ", new Object[]{"D", taskId});
        DB.update(DBRoute.of(Constant.DB_KEY), "update t_gai_repo_doc_manage set ffilestatus=? ,flog=? where fentryid=? and ffilestatus not in (?,?,?)", new Object[]{Constant.RepoInfo.file_status_err_embedding, substring, Long.valueOf(j2), "D", Constant.RepoInfo.file_status_err_embedding, Constant.RepoInfo.file_status_err_del});
    }

    public static Result2User process(EmbeddingMessage embeddingMessage) {
        Result2User embeddingAsync = EmbeddingServiceFactory.getExecutor(embeddingMessage.getLlm()).embeddingAsync(embeddingMessage);
        String taskId = embeddingAsync.getTaskId();
        embeddingMessage.setTaskId(taskId);
        embeddingMessage.setChunks(null);
        EngineCache.getAppCache(taskId).put(Constant.TaskCache.EMBEDDING_TASK_PRE + taskId, JsonUtil.messageToJson(embeddingMessage));
        logger.info("【知识库-Embedding】异步调用回调缓存，taskId({}),content：{}", taskId, JsonUtil.messageToJson(embeddingMessage));
        return embeddingAsync;
    }

    private static DLock getLock(String str) {
        logger.info(String.format("【GPT-知识库】- embedding异步回调任务, 锁Key：%s", str));
        DLock create = DLock.create(str, String.format("锁:%s,【GPT-知识库】embedding异步回调任务锁", str));
        create.fastMode();
        return create;
    }
}
