package kd.bos.gptas.milvus;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.google.gson.JsonObject;
import io.milvus.client.MilvusServiceClient;
import io.milvus.common.clientenum.ConsistencyLevelEnum;
import io.milvus.grpc.DataType;
import io.milvus.grpc.QueryResults;
import io.milvus.grpc.SearchResults;
import io.milvus.param.ConnectParam;
import io.milvus.param.IndexType;
import io.milvus.param.MetricType;
import io.milvus.param.R;
import io.milvus.param.collection.CreateCollectionParam;
import io.milvus.param.collection.FieldType;
import io.milvus.param.collection.HasCollectionParam;
import io.milvus.param.collection.LoadCollectionParam;
import io.milvus.param.dml.DeleteParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.param.dml.QueryParam;
import io.milvus.param.dml.SearchParam;
import io.milvus.param.index.CreateIndexParam;
import io.milvus.response.QueryResultsWrapper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kd.bos.cache.CacheFactory;
import kd.bos.cache.DistributeCacheHAPolicy;
import kd.bos.cache.DistributeSessionlessCache;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.dataentity.serialization.SerializationUtils;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.encrypt.Encrypters;
import kd.bos.entity.cache.CacheKeyUtil;
import kd.bos.exception.KDBizException;
import kd.bos.exception.KDException;
import kd.bos.gptas.agent.AiChatModelFactory;
import kd.bos.gptas.openapi.OpenApiClient;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.servicehelper.DispatchServiceHelper;
import kd.bos.session.SystemPropertyUtils;
import kd.bos.util.CollectionUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:kd/bos/gptas/milvus/MilvusDaoImpl.class */
public class MilvusDaoImpl implements MilvusDao {
    private static final int MAX_TOP = 200;
    private static final String SEARCH_PARAM = "{\"nprobe\":10, \"offset\":0}";
    private MilvusServiceClient milvusServiceClient = null;
    private static OpenApiClient openApiClient;
    private final Config config;
    private static final Log log = LogFactory.getLog(MilvusDaoImpl.class);
    private static final DistributeSessionlessCache cache = CacheFactory.getCommonCacheFactory().getDistributeSessionlessCache((String) null, new DistributeCacheHAPolicy(true, true));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/gptas/milvus/MilvusDaoImpl$Config.class */
    public enum Config {
        AZURE_EMBEDDING_ADA_002(1536, "AZURE_EMBEDDING_ADA_002", "repo file chunk aure 1536 dimension vector.") { // from class: kd.bos.gptas.milvus.MilvusDaoImpl.Config.1
            @Override // kd.bos.gptas.milvus.MilvusDaoImpl.Config
            public List<List<Float>> getVector(String str) {
                return Collections.singletonList(JSON.parseObject(JSON.toJSONString((List) JSONPath.read(str, "$.data[0].embedding", List.class)), new TypeReference<List<Float>>() { // from class: kd.bos.gptas.milvus.MilvusDaoImpl.Config.1.1
                }, new Feature[0]));
            }
        },
        BAIDU_EMBEDDING_V1(384, "BAIDU_EMBEDDING_V1", "repo file chunk baidu 384 dimension vector.") { // from class: kd.bos.gptas.milvus.MilvusDaoImpl.Config.2
            @Override // kd.bos.gptas.milvus.MilvusDaoImpl.Config
            public List<List<Float>> getVector(String str) {
                JSONObject parseObject = JSON.parseObject(str);
                if (!parseObject.containsKey("usage")) {
                    throw new KDBizException(String.format(ResManager.loadKDString("embedding 调用错误%s", "MilvusDaoImpl_0", "bos-devportal-gptas", new Object[0]), str));
                }
                JSONArray jSONArray = parseObject.getJSONArray("data");
                ArrayList arrayList = new ArrayList(384);
                for (int i = 0; i < jSONArray.size(); i++) {
                    arrayList.add((List) JSON.parseObject(jSONArray.getJSONObject(i).getString("embedding"), new TypeReference<List<Float>>() { // from class: kd.bos.gptas.milvus.MilvusDaoImpl.Config.2.1
                    }, new Feature[0]));
                }
                return arrayList;
            }
        },
        KINGDEE_EMBEDDING(768, "KINGDEE_EMBEDDING", "repo file chunk aure 768 dimension vector. ") { // from class: kd.bos.gptas.milvus.MilvusDaoImpl.Config.3
            @Override // kd.bos.gptas.milvus.MilvusDaoImpl.Config
            public List<List<Float>> getVector(String str) {
                JSONArray jSONArray = JSON.parseObject(str).getJSONArray("data");
                ArrayList arrayList = new ArrayList(768);
                for (int i = 0; i < jSONArray.size(); i++) {
                    arrayList.add((List) JSON.parseObject(jSONArray.getJSONObject(i).getString("embedding"), new TypeReference<List<Float>>() { // from class: kd.bos.gptas.milvus.MilvusDaoImpl.Config.3.1
                    }, new Feature[0]));
                }
                return arrayList;
            }
        };

        private final int dimension;
        private final String indexMethod;
        private final String desc;

        abstract List<List<Float>> getVector(String str);

        Config(int i, String str, String str2) {
            this.dimension = i;
            this.indexMethod = str;
            this.desc = str2;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.milvusServiceClient != null) {
            this.milvusServiceClient.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<Long> getSubmitIdSet(List<Chunk> list) {
        return new HashSet((Collection) list.stream().map(chunk -> {
            return Long.valueOf(chunk.getId());
        }).collect(Collectors.toList()));
    }

    public MilvusDaoImpl(String str) {
        this.config = Config.valueOf(str);
        init();
    }

    private void createIndex() {
        doCreateIndex(CreateIndexParam.newBuilder().withCollectionName(getCollectionName()).withFieldName("vector").withIndexType(IndexType.IVF_FLAT).withMetricType(MetricType.L2).withExtraParam(getDimensionParam()).withSyncMode(Boolean.FALSE).build());
    }

    private String getCollectionName() {
        return String.format("M_%s_%s", RequestContext.get().getAccountId(), this.config.indexMethod);
    }

    private int getDimension() {
        return this.config.dimension;
    }

    private String getDimensionParam() {
        return String.format("{\"nlist\":%s}", Integer.valueOf(getDimension()));
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public List<Chunk> search(List<Long> list, String str) {
        return search(list, str, 5);
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public List<Chunk> search(List<Long> list, String str, int i) {
        return searchIds(getVectors(Collections.singletonList(str)).get(0), list, i);
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public List<Chunk> searchByChunks(List<Long> list, List<Long> list2, String str, int i) {
        return toChunkList(search(getVectors(Collections.singletonList(str)).get(0), list, list2, i));
    }

    private List<Chunk> toChunkList(SearchResults searchResults) {
        if (searchResults == null) {
            return Collections.emptyList();
        }
        List<Long> dataList = searchResults.getResults().getIds().getIntId().getDataList();
        if (dataList.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(dataList.size());
        int i = 0;
        for (Long l : dataList) {
            Chunk chunk = new Chunk();
            chunk.setId(l.longValue());
            int i2 = i;
            i++;
            chunk.setScores(searchResults.getResults().getScores(i2));
            arrayList.add(chunk);
        }
        return arrayList;
    }

    private Map<String, String> callOpenAPI(String str, String str2) {
        if (openApiClient == null) {
            openApiClient = OpenApiClient.builder().build();
        }
        HashMap hashMap = new HashMap(16);
        hashMap.put("indexmethod", str);
        hashMap.put("text", str2);
        JsonObject requestPost = openApiClient.requestPost("/kapi/v2/kdtest/aicc/gptas_synccall", SerializationUtils.toJsonString(hashMap));
        HashMap hashMap2 = new HashMap(16);
        JsonObject asJsonObject = requestPost.getAsJsonObject("data").getAsJsonObject("result");
        for (String str3 : asJsonObject.keySet()) {
            hashMap2.put(str3, asJsonObject.get(str3).getAsString());
        }
        return hashMap2;
    }

    private List<List<Float>> getVectors(List<String> list) {
        HashMap hashMap = new HashMap();
        hashMap.put("input", list);
        String userName = RequestContext.get().getUserName();
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("stream", "false");
        log.info("用户{}开始调用embedding{} ", new Object[]{userName, "syncService", JSON.toJSON(hashMap)});
        Map<String, String> callAICCService = callAICCService(hashMap, hashMap2);
        log.info("{} {} {} {}", new Object[]{userName, "syncService", JSON.toJSON(callAICCService), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        callAICCService.get("id");
        String str = callAICCService.get("message");
        String str2 = callAICCService.get("errorCode");
        if (!"0".equals(str2)) {
            if (((String) cache.get(getErrorTimeStampKey())) != null) {
                throwEmbeddingException(str2, str);
            }
            callAICCService = callAICCService(hashMap, hashMap2);
            callAICCService.get("id");
            String str3 = callAICCService.get("message");
            String str4 = callAICCService.get("errorCode");
            if (!"0".equals(str4)) {
                cache.put(getErrorTimeStampKey(), String.valueOf(System.currentTimeMillis()), 180);
                throwEmbeddingException(str4, str3);
            }
        }
        return this.config.getVector(callAICCService.get("result"));
    }

    private void throwEmbeddingException(String str, String str2) {
        throw new KDBizException(String.format(ResManager.loadKDString("embedding调用出错%1$s):%2$s", "MilvusDaoImpl_1", "bos-devportal-gptas", new Object[0]), str, str2));
    }

    private String getErrorTimeStampKey() {
        return CacheKeyUtil.getAcctId() + "." + this.config.indexMethod + ".errortimestamp";
    }

    private Map<String, String> callAICCService(Map<String, Object> map, Map<String, String> map2) {
        return String.valueOf(Boolean.TRUE).equals(SystemPropertyUtils.getProptyByTenant(AiChatModelFactory.GPT_LOCAL_ENABLE, String.valueOf(Boolean.FALSE))) ? callOpenAPI(this.config.indexMethod, SerializationUtils.toJsonString(map)) : (Map) DispatchServiceHelper.invokeBizService("ai", "aicc", "AiccService", "syncService", new Object[]{map2, this.config.indexMethod, JSON.toJSON(map).toString()});
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public List<Chunk> searchIds(List<Float> list, List<Long> list2) {
        return searchIds(list, list2, 5);
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public List<Chunk> searchIds(List<Float> list, List<Long> list2, int i) {
        return toChunkList(search(list, list2, null, i));
    }

    private boolean isEmpty(String str) {
        return str == null || str.trim().isEmpty();
    }

    private String decode(String str) {
        return Encrypters.isEncrypted(str) ? Encrypters.decode(str) : str;
    }

    private MilvusServiceClient getClient() {
        if (this.milvusServiceClient == null) {
            String tenantId = RequestContext.get().getTenantId();
            String proptyByTenant = SystemPropertyUtils.getProptyByTenant("milvus.host", tenantId);
            String proptyByTenant2 = SystemPropertyUtils.getProptyByTenant("milvus.user", tenantId);
            String decode = decode(SystemPropertyUtils.getProptyByTenant("milvus.passwd", tenantId));
            if (isEmpty(proptyByTenant) || isEmpty(proptyByTenant2) || isEmpty(decode)) {
                throw new RuntimeException(ResManager.loadKDString("需要在MC中配置milvus.host,milvus.user,milvus.passwd", "MilvusDaoImpl_2", "bos-devportal-gptas", new Object[0]));
            }
            this.milvusServiceClient = new MilvusServiceClient(ConnectParam.newBuilder().withHost(proptyByTenant).withPort(SystemPropertyUtils.getInteger(tenantId, "milvus.port", 19530).intValue()).withAuthorization(proptyByTenant2, decode).withConnectTimeout(3L, TimeUnit.MINUTES).build());
            this.milvusServiceClient.withTimeout(2L, TimeUnit.MINUTES);
        }
        return this.milvusServiceClient;
    }

    private void init() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(FieldType.newBuilder().withName("vector").withDataType(DataType.FloatVector).withDimension(Integer.valueOf(getDimension())).withAutoID(false).build());
        arrayList.add(FieldType.newBuilder().withName("repoId").withDataType(DataType.Int64).build());
        if (doCreateCollection(arrayList)) {
            log.info("collectionName:{} milvus createIndex start", getCollectionName());
            createIndex();
            log.info("collectionName:{} milvus createIndex end", getCollectionName());
        }
        log.info("collectionName:{} milvus doLoadCollection start", getCollectionName());
        doLoadCollection();
        log.info("collectionName:{} milvus doLoadCollection end", getCollectionName());
    }

    private void doCreateIndex(CreateIndexParam createIndexParam) {
        MilvusServiceClient client = getClient();
        if (client == null) {
            return;
        }
        log.info("{} 索引 {} 创建完成，result：{}", new Object[]{getCollectionName(), createIndexParam.getCollectionName(), client.createIndex(createIndexParam)});
    }

    private boolean doCreateCollection(List<FieldType> list) {
        log.info("milvus doCreateCollection start ");
        MilvusServiceClient client = getClient();
        if (client == null) {
            log.info("createCollection milvus未启用");
            return false;
        }
        log.info("milvus hasCollection start ");
        R hasCollection = client.hasCollection(HasCollectionParam.newBuilder().withCollectionName(getCollectionName()).build());
        if (hasCollection.getData() == null) {
            String message = hasCollection.getException().getMessage();
            if (message.contains("username") || message.contains("password") || message.contains("correct")) {
                log.info("createCollection milvus账号密码不对!!!!!!!");
                client.close();
                return false;
            }
        }
        log.info("milvus hasCollection end result:{} ", hasCollection);
        if (hasCollection.getData() == Boolean.TRUE) {
            log.info("{} collection 已经存在，不再重复创建", getCollectionName());
            return false;
        }
        CreateCollectionParam.Builder withShardsNum = CreateCollectionParam.newBuilder().withCollectionName(getCollectionName()).withDescription(getCollectionDescription()).withShardsNum(2);
        withShardsNum.addFieldType(FieldType.newBuilder().withName("id").withDataType(DataType.Int64).withPrimaryKey(true).withAutoID(false).build());
        for (FieldType fieldType : list) {
            if (!fieldType.getName().equals("id")) {
                withShardsNum.addFieldType(fieldType);
            }
        }
        CreateCollectionParam build = withShardsNum.build();
        log.info("{} 创建 开始", getCollectionName());
        R createCollection = client.createCollection(build);
        log.info("{} 创建完成，result：{}", getCollectionName(), createCollection);
        return createCollection.getStatus().intValue() == R.Status.Success.getCode();
    }

    private String getCollectionDescription() {
        return this.config.desc;
    }

    private void doLoadCollection() {
        MilvusServiceClient client = getClient();
        if (client == null) {
            log.error("loadCollection {} milvusServiceClient 创建失败", getCollectionName());
        } else {
            log.info("loadCollection {} result {}", getCollectionName(), client.loadCollection(LoadCollectionParam.newBuilder().withCollectionName(getCollectionName()).build()));
        }
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public boolean insert(Chunk chunk) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        if (chunk.getVector().isEmpty()) {
            chunk.setVector(getVectors(Collections.singletonList(chunk.getChunk())).get(0));
        }
        if (chunk.getVector() == null || chunk.getVector().size() != getDimension()) {
            throw new KDBizException(String.format(ResManager.loadKDString("向量size不对%s", "MilvusDaoImpl_3", "bos-devportal-gptas", new Object[0]), Integer.valueOf(chunk.getVector().size())));
        }
        arrayList.add(Long.valueOf(chunk.getId()));
        arrayList2.add(Long.valueOf(chunk.getRepositoryId()));
        arrayList3.add(chunk.getVector());
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(new InsertParam.Field("id", arrayList));
        arrayList4.add(new InsertParam.Field("repoId", arrayList2));
        arrayList4.add(new InsertParam.Field("vector", arrayList3));
        R insert = getClient().insert(InsertParam.newBuilder().withCollectionName(getCollectionName()).withFields(arrayList4).build());
        log.info("milvus batch insert {}", insert);
        return insert.getStatus().intValue() != R.Status.Success.getCode();
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public List<Chunk> batchInsert(List<Chunk> list) {
        if (list == null || list.isEmpty()) {
            throw new KDBizException(ResManager.loadKDString("分段为空，可能是空文件", "MilvusDaoImpl_4", "bos-devportal-gptas", new Object[0]));
        }
        int size = list.size();
        ArrayList arrayList = new ArrayList(size);
        ArrayList arrayList2 = new ArrayList(size);
        ArrayList arrayList3 = new ArrayList(size);
        for (Chunk chunk : list) {
            try {
                if (chunk.getVector().isEmpty()) {
                    chunk.setVector(getVectors(Collections.singletonList(chunk.getChunk())).get(0));
                }
                arrayList3.add(chunk.getVector());
                arrayList.add(Long.valueOf(chunk.getId()));
                arrayList2.add(Long.valueOf(chunk.getRepositoryId()));
            } catch (KDException e) {
                chunk.setFailMsg(e.getMessage());
            }
        }
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(new InsertParam.Field("id", arrayList));
        arrayList4.add(new InsertParam.Field("repoId", arrayList2));
        arrayList4.add(new InsertParam.Field("vector", arrayList3));
        R insert = getClient().insert(InsertParam.newBuilder().withCollectionName(getCollectionName()).withFields(arrayList4).build());
        log.info("milvus batch insert {}", insert);
        if (insert.getStatus().intValue() != R.Status.Success.getCode()) {
            for (Chunk chunk2 : list) {
                if (chunk2.getFailMsg() == null) {
                    chunk2.setFailMsg(insert.getMessage());
                }
            }
        }
        return list;
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public boolean batchInsertAsync(String str, List<Chunk> list, Consumer<List<Chunk>> consumer) {
        return new MilvusInsertThread(this, str).addChunkListAndStartThread(list, consumer);
    }

    private String orJoin(String str, List<?> list) {
        if (list == null || list.isEmpty()) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        Iterator<?> it = list.iterator();
        while (it.hasNext()) {
            sb.append(str).append(" == ").append(it.next()).append(" or ");
        }
        sb.delete(sb.length() - " or ".length(), sb.length());
        return sb.toString();
    }

    private SearchResults search(List<Float> list, List<Long> list2, List<Long> list3, int i) {
        log.info("milvus search params,top:{}, repoIdList:{}", Integer.valueOf(i), JSON.toJSONString(list2));
        int min = i == 0 ? 1 : Math.min(i, MAX_TOP);
        log.info("milvus search params,exec top:{}", Integer.valueOf(min));
        List singletonList = Collections.singletonList("repoId");
        List singletonList2 = Collections.singletonList(list);
        String orJoin = orJoin("repoId", list2);
        String orJoin2 = orJoin("id", list3);
        if (StringUtils.isNotBlank(orJoin2)) {
            orJoin = "(" + orJoin + ") and (" + orJoin2 + ")";
        }
        R search = getClient().search(SearchParam.newBuilder().withCollectionName(getCollectionName()).withMetricType(MetricType.L2).withOutFields(singletonList).withTopK(Integer.valueOf(min)).withVectors(singletonList2).withVectorFieldName("vector").withExpr(orJoin).withParams(SEARCH_PARAM).build());
        if (search.getStatus().intValue() == R.Status.Success.getCode()) {
            return (SearchResults) search.getData();
        }
        log.info("milvus search result {}", search);
        return null;
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public void delByIdList(List<Long> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder("id");
        sb.append(" in [");
        sb.append((String) list.stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.joining(",")));
        sb.append(']');
        log.info("milvus del {} ,result {}", sb, getClient().delete(DeleteParam.newBuilder().withCollectionName(getCollectionName()).withExpr(sb.toString()).build()));
    }

    @Override // kd.bos.gptas.milvus.MilvusDao
    public List<Chunk> query(List<Long> list, List<Long> list2, boolean z) {
        String str = " repoId in [" + ((String) list.stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.joining(","))) + "]";
        String str2 = CollectionUtils.isNotEmpty(list2) ? "and id in [" + ((String) list2.stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.joining(","))) + "]" : "";
        ArrayList arrayList = new ArrayList(2);
        arrayList.add("repoId");
        arrayList.add("id");
        arrayList.add("vector");
        R query = getClient().query(QueryParam.newBuilder().withCollectionName(getCollectionName()).withConsistencyLevel(ConsistencyLevelEnum.STRONG).withExpr(str + str2).withOutFields(arrayList).withOffset(0L).withLimit(Long.valueOf(CollectionUtils.isEmpty(list2) ? 100L : list2.size())).build());
        if (query.getStatus().intValue() != R.Status.Success.getCode()) {
            throw new KDBizException("milvus query err:" + query.getMessage());
        }
        QueryResultsWrapper queryResultsWrapper = new QueryResultsWrapper((QueryResults) query.getData());
        List fieldData = queryResultsWrapper.getFieldWrapper("repoId").getFieldData();
        List fieldData2 = queryResultsWrapper.getFieldWrapper("id").getFieldData();
        List fieldData3 = queryResultsWrapper.getFieldWrapper("vector").getFieldData();
        ArrayList arrayList2 = new ArrayList(fieldData.size());
        int i = 0;
        queryResultsWrapper.getFieldWrapper("vector").getDim();
        for (Object obj : fieldData) {
            Chunk chunk = new Chunk();
            chunk.setRepositoryId(((Long) obj).longValue());
            chunk.setId(((Long) fieldData2.get(i)).longValue());
            arrayList2.add(chunk);
            chunk.setVector((List) fieldData3.get(i));
            i++;
        }
        return arrayList2;
    }
}
