package kd.isc.iscb.platform.core.dc.e;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.dataentity.resource.ResManager;
import kd.isc.iscb.platform.core.dc.meta.DataCopyConsumer;
import kd.isc.iscb.platform.core.dc.mq.MQUtil;
import kd.isc.iscb.platform.core.dc.mq.MessageQueueManager;
import kd.isc.iscb.platform.core.dc.mq.MessageQueueServer;
import kd.isc.iscb.platform.core.fn.ext.Functions;
import kd.isc.iscb.util.connector.SaveDataType;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.except.IscBizException;
import kd.isc.iscb.util.io.ObjectWriter;
import kd.isc.iscb.util.misc.Json;
import kd.isc.iscb.util.script.Script;

/* loaded from: input_file:kd/isc/iscb/platform/core/dc/e/MQueueWriter.class */
public class MQueueWriter implements ObjectWriter<Map<String, Object>> {
    private static final String ACTION = "$action";
    private DynamicObject publisher;
    private DataCopyConsumer param;
    private Script formatScript;
    private MessageQueueServer server;
    private int batchSize;
    private boolean isArray;
    private static ThreadLocal<Map<MQueueWriter, List<Map<String, Object>>>> batchCache = new ThreadLocal<>();
    private int topicCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MQueueWriter(DataCopyConsumer dataCopyConsumer, DynamicObject dynamicObject, int i) {
        this.param = dataCopyConsumer;
        this.publisher = dynamicObject;
        this.topicCount = i;
        this.server = MessageQueueManager.get(dynamicObject.getLong("group_id"));
        this.isArray = this.publisher.getBoolean("multi_line");
        this.batchSize = this.isArray ? dataCopyConsumer.getBatchSize() : 1;
        compileFormatScript();
    }

    public MQueueWriter(DataCopyConsumer dataCopyConsumer) {
        this(dataCopyConsumer, dataCopyConsumer.getPublisherQueue(), 1);
    }

    private void compileFormatScript() {
        String s = D.s(this.publisher.get("format_script_tag"));
        if (s == null) {
            s = D.s(this.publisher.get("format_script"));
        }
        if (s != null) {
            this.formatScript = Script.compile(s);
        }
    }

    public void write(Map<String, Object> map) {
        List<Map<String, Object>> batch = getBatch();
        batch.add(map);
        if (batch.size() >= this.batchSize) {
            flush();
        }
    }

    private void flush() {
        List<Map<String, Object>> batch = getBatch();
        if (batch.size() == 0) {
            return;
        }
        try {
            presetFailed();
            publish(batch);
            setSuccess(batch);
        } finally {
            batch.clear();
        }
    }

    private void publish(List<Map<String, Object>> list) {
        MQUtil.publish(serialize(list), this.server, this.publisher, getDataProducer(this.param));
    }

    public static String getDataProducer(DataCopyConsumer dataCopyConsumer) {
        return dataCopyConsumer.getDataProducerDesc();
    }

    private void setSuccess(List<Map<String, Object>> list) {
        Iterator<Map<String, Object>> it = list.iterator();
        while (it.hasNext()) {
            it.next().put(ACTION, SaveDataType.UNKNOWN);
        }
    }

    private List<Map<String, Object>> getBatch() {
        Map<MQueueWriter, List<Map<String, Object>>> map = batchCache.get();
        if (map == null) {
            map = new HashMap(this.topicCount);
            batchCache.set(map);
        }
        List<Map<String, Object>> list = map.get(this);
        if (list == null) {
            list = new ArrayList();
            map.put(this, list);
        }
        return list;
    }

    private void presetFailed() {
        Iterator<Map<String, Object>> it = getBatch().iterator();
        while (it.hasNext()) {
            it.next().put(ACTION, SaveDataType.FAILED);
        }
    }

    private String serialize(List<Map<String, Object>> list) {
        return this.formatScript != null ? formatByScript(list) : this.isArray ? formatArray(list) : Json.toString(dup(list.get(0)), true);
    }

    private String formatArray(List<Map<String, Object>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Map<String, Object>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(dup(it.next()));
        }
        return Json.toString(arrayList, true);
    }

    private String formatByScript(List<Map<String, Object>> list) {
        HashMap hashMap = new HashMap();
        hashMap.put("$data", this.isArray ? list : list.get(0));
        Object eval = this.formatScript.eval(hashMap);
        return eval instanceof String ? (String) eval : Json.toString(eval, true);
    }

    private Map<?, ?> dup(Map<String, Object> map) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(map.size() + 10);
        linkedHashMap.putAll(map);
        linkedHashMap.remove(ACTION);
        linkedHashMap.put("$actions", this.param.getTargetActions());
        linkedHashMap.put("$data_handler", this.param.getTargetDataHandler());
        DynamicObject targetSchema = this.param.getTargetSchema();
        if (targetSchema != null) {
            linkedHashMap.put("$meta_type", targetSchema.get("type"));
            linkedHashMap.put("$meta_name", targetSchema.get("full_name"));
        }
        return linkedHashMap;
    }

    public void close() {
        Map<MQueueWriter, List<Map<String, Object>>> map = batchCache.get();
        if (map == null) {
            return;
        }
        if (!map.remove(this).isEmpty()) {
            throw new IscBizException(ResManager.loadKDString("关闭前必须调用 commit 或 rollback。", "MQueueWriter_1", "isc-iscb-platform-core", new Object[0]));
        }
        if (this.topicCount == 1) {
            batchCache.remove();
            if (!map.isEmpty()) {
                throw new IscBizException(ResManager.loadKDString("调用者的调用方式不正确，当前是单主题模式，但当前线程存在多个输出实例。", "MQueueWriter_0", "isc-iscb-platform-core", new Object[0]));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void disposeForMultiTopicMode() {
        Map<MQueueWriter, List<Map<String, Object>>> map = batchCache.get();
        batchCache.remove();
        if (!map.isEmpty()) {
            throw new IscBizException(ResManager.loadKDString("调用者的调用方式不正确，当前是多主题模式，但部分输出实例未正确关闭。", "MQueueWriter_2", "isc-iscb-platform-core", new Object[0]));
        }
    }

    public void commit() {
        flush();
    }

    public boolean rollback(Throwable th) {
        List<Map<String, Object>> batch = getBatch();
        Iterator<Map<String, Object>> it = batch.iterator();
        while (it.hasNext()) {
            this.param.saveTargetErrorLog(th, it.next());
        }
        batch.clear();
        return false;
    }

    static {
        Functions.init();
    }
}
