package kd.bos.gptas.milvus;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import kd.bos.cache.CacheFactory;
import kd.bos.cache.DistributeCacheHAPolicy;
import kd.bos.cache.DistributeSessionlessCache;
import kd.bos.dataentity.serialization.SerializationUtils;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.entity.cache.CacheKeyUtil;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/bos/gptas/milvus/MilvusInsertThread.class */
class MilvusInsertThread {
    private static final Log logger = LogFactory.getLog(MilvusInsertThread.class);
    private static final DistributeSessionlessCache cache = CacheFactory.getCommonCacheFactory().getDistributeSessionlessCache((String) null, new DistributeCacheHAPolicy(true, true));
    private final MilvusDao milvusDao;
    private final String taskId;

    public MilvusInsertThread(MilvusDao milvusDao, String str) {
        this.milvusDao = milvusDao;
        this.taskId = str;
    }

    public boolean addChunkListAndStartThread(List<Chunk> list, Consumer<List<Chunk>> consumer) {
        if (list == null || list.isEmpty()) {
            return false;
        }
        String[] strArr = new String[list.size()];
        int i = 0;
        Iterator<Chunk> it = list.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            strArr[i2] = SerializationUtils.toJsonString(it.next());
        }
        cache.addList(getChunkListKey(), strArr, 28800);
        if (cache.inc(getSingleThreadKey(), 180) != 1) {
            return false;
        }
        ThreadPools.executeOnce(getClass().getName() + "." + this.taskId, () -> {
            batchInsertRunner(consumer);
        });
        return true;
    }

    private void batchInsertRunner(Consumer<List<Chunk>> consumer) {
        try {
            try {
                String chunkListKey = getChunkListKey();
                long currentTimeMillis = System.currentTimeMillis();
                for (String[] list = cache.getList(chunkListKey); list != null && list.length > 0; list = cache.getList(chunkListKey)) {
                    logger.info("start batch size: {}  ", Integer.valueOf(list.length));
                    if (System.currentTimeMillis() - currentTimeMillis > 170000) {
                        cache.inc(getSingleThreadKey(), 180);
                        currentTimeMillis = System.currentTimeMillis();
                    }
                    cache.removeListObjects(chunkListKey, 0, list.length);
                    ArrayList arrayList = new ArrayList(list.length);
                    for (String str : list) {
                        arrayList.add(SerializationUtils.fromJsonString(str, Chunk.class));
                    }
                    Set<Long> submitIdSet = ((MilvusDaoImpl) this.milvusDao).getSubmitIdSet(arrayList);
                    arrayList.removeIf(chunk -> {
                        return !submitIdSet.contains(Long.valueOf(chunk.getId()));
                    });
                    List<Chunk> batchInsert = this.milvusDao.batchInsert(arrayList);
                    try {
                        logger.info("finished size: {}  ", Integer.valueOf(batchInsert.size()));
                        consumer.accept(batchInsert);
                        Set<Long> submitIdSet2 = ((MilvusDaoImpl) this.milvusDao).getSubmitIdSet(arrayList);
                        arrayList.removeIf(chunk2 -> {
                            return !submitIdSet2.contains(Long.valueOf(chunk2.getId()));
                        });
                        ArrayList arrayList2 = new ArrayList(16);
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            arrayList2.add(Long.valueOf(((Chunk) it.next()).getId()));
                        }
                        if (!arrayList2.isEmpty()) {
                            this.milvusDao.delByIdList(arrayList2);
                        }
                    } catch (Exception e) {
                        logger.error(e);
                    }
                }
                cache.remove(getSingleThreadKey());
            } catch (Exception e2) {
                logger.error(e2);
                cache.remove(getSingleThreadKey());
            }
        } catch (Throwable th) {
            cache.remove(getSingleThreadKey());
            throw th;
        }
    }

    private String getChunkListKey() {
        return CacheKeyUtil.getAcctId() + ".qa_chunklist." + this.taskId;
    }

    private static String getDelChunkListKey() {
        return CacheKeyUtil.getAcctId() + ".qa_delchunklist";
    }

    private String getSingleThreadKey() {
        return getSingleThreadKey(this.taskId);
    }

    private static String getSingleThreadKey(String str) {
        return CacheKeyUtil.getAcctId() + ".qa_chunkthread." + str;
    }

    public static boolean isThreadRunning(String str) {
        return StringUtils.isNotBlank((CharSequence) cache.get(getSingleThreadKey(str)));
    }
}
