package kd.isc.iscx.platform.core.res.runtime.trigger;

import java.sql.Connection;
import java.sql.Timestamp;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.db.tx.TX;
import kd.bos.id.IDService;
import kd.bos.servicehelper.BusinessDataServiceHelper;
import kd.isc.iscb.platform.core.connector.ConnectorUtil;
import kd.isc.iscb.platform.core.dc.mq.DataFlowRouter;
import kd.isc.iscb.platform.core.dc.mq.MessageQueueInitiator;
import kd.isc.iscb.util.db.DbUtil;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.misc.NetUtil;
import kd.isc.iscb.util.script.misc.SystemContext;
import kd.isc.iscx.platform.core.res.meta.event.AbstractEventModel;
import kd.isc.iscx.platform.core.res.meta.event.MQueueEvent;
import kd.isc.iscx.platform.core.res.runtime.Connector;
import kd.isc.iscx.platform.core.res.runtime.DataStreamTrigger;
import kd.isc.iscx.platform.core.res.runtime.job.DataStream;
import kd.isc.iscx.platform.core.res.runtime.job.DataTask;
import kd.isc.iscx.platform.core.res.runtime.job.task.FiberTask;

/* loaded from: input_file:kd/isc/iscx/platform/core/res/runtime/trigger/MQEventTrigger.class */
public class MQEventTrigger implements DataStreamTrigger {
    private MQueueEvent event;
    private Connector connector;
    private long data_flow_trigger_id;
    private String startNodeId;

    public MQEventTrigger(MQueueEvent mQueueEvent, Connector connector, long j, String str) {
        this.event = mQueueEvent;
        this.connector = connector;
        this.data_flow_trigger_id = j;
        this.startNodeId = str;
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.DataStreamTrigger
    public AbstractEventModel getEventModel() {
        return this.event;
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.DataStreamTrigger
    public DataTask createFirstTask(DataStream dataStream, Map<String, Object> map) {
        dataStream.getCounter().incTotalCount();
        return new FiberTask(dataStream, this.startNodeId, this.event.getParams(), map);
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.DataStreamTrigger
    public void disable() {
        long originalMqTopic = getOriginalMqTopic();
        HashSet hashSet = new HashSet(2);
        if (originalMqTopic != 0) {
            disableOriginalMqTopic(originalMqTopic, hashSet);
        }
        DataFlowRouter.clearCache();
        resetMqServerListeners(hashSet);
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.DataStreamTrigger
    public void enable() {
        long originalMqTopic = getOriginalMqTopic();
        long orCreateRequiredMqTopic = getOrCreateRequiredMqTopic();
        HashSet hashSet = new HashSet(2);
        if (originalMqTopic != orCreateRequiredMqTopic) {
            disableOriginalMqTopic(originalMqTopic, hashSet);
            bindRequiredMqTopic(orCreateRequiredMqTopic);
        }
        enableRequiredMqTopic(orCreateRequiredMqTopic, hashSet);
        DataFlowRouter.clearCache();
        resetMqServerListeners(hashSet);
    }

    private void resetMqServerListeners(Set<Long> set) {
        for (Long l : set) {
            if (l.longValue() != 0) {
                MessageQueueInitiator.resetListeners(l.longValue());
            }
        }
    }

    private void enableRequiredMqTopic(long j, Set<Long> set) {
        if (j != 0) {
            DynamicObject loadSingle = BusinessDataServiceHelper.loadSingle(Long.valueOf(j), "isc_mq_subscriber");
            if (D.x(loadSingle.get("enable"))) {
                return;
            }
            ConnectorUtil.enable(loadSingle);
            set.add(Long.valueOf(D.l(loadSingle.get("group_id"))));
        }
    }

    private void disableOriginalMqTopic(long j, Set<Long> set) {
        if (j != 0) {
            DynamicObject loadSingle = BusinessDataServiceHelper.loadSingle(Long.valueOf(j), "isc_mq_subscriber");
            if (D.x(loadSingle.get("enable"))) {
                ConnectorUtil.disable(loadSingle);
                set.add(Long.valueOf(D.l(loadSingle.get("group_id"))));
            }
        }
    }

    private void bindRequiredMqTopic(long j) {
        Connection connection = TX.getConnection("ISCB", false);
        try {
            DbUtil.executeUpdate(connection, "UPDATE t_iscx_datax_trigger SET fmq_topic_id =? WHERE fid=?", D.asList(new Object[]{Long.valueOf(j), Long.valueOf(this.data_flow_trigger_id)}), D.asList(new Integer[]{-5, -5}));
            DbUtil.close(connection, true);
        } catch (Throwable th) {
            DbUtil.close(connection, true);
            throw th;
        }
    }

    private long getOriginalMqTopic() {
        Connection connection = TX.getConnection("ISCB", true);
        try {
            long l = D.l(DbUtil.executeScalar(connection, "SELECT fmq_topic_id FROM t_iscx_datax_trigger WHERE fid=?", D.asList(new Object[]{Long.valueOf(this.data_flow_trigger_id)}), D.asList(new Integer[]{-5})));
            DbUtil.close(connection, true);
            return l;
        } catch (Throwable th) {
            DbUtil.close(connection, true);
            throw th;
        }
    }

    private long getOrCreateRequiredMqTopic() {
        long findRequiredMqTopic = findRequiredMqTopic();
        if (findRequiredMqTopic == 0) {
            findRequiredMqTopic = createRequiredMqTopic();
        }
        return findRequiredMqTopic;
    }

    private long createRequiredMqTopic() {
        long genLongId = IDService.get().genLongId();
        DynamicObject newDynamicObject = BusinessDataServiceHelper.newDynamicObject("isc_mq_subscriber");
        newDynamicObject.set("id", Long.valueOf(genLongId));
        newDynamicObject.set("group_id", Long.valueOf(this.connector.getDbLink()));
        newDynamicObject.set("number", this.event.getMQTopicNumber());
        newDynamicObject.set("name", this.event.getName());
        newDynamicObject.set("isv", "ISC.DataFlow");
        newDynamicObject.set("status", "C");
        newDynamicObject.set("creator", Long.valueOf(RequestContext.get().getCurrUserId()));
        newDynamicObject.set("createtime", new Timestamp(System.currentTimeMillis()));
        boolean isProcEnv = SystemContext.isProcEnv();
        newDynamicObject.set("protect_level", isProcEnv ? "READ_ONLY" : "DEFAULT");
        newDynamicObject.set("subscriber_ip_pattern", isProcEnv ? "*" : NetUtil.getLocalAddress());
        newDynamicObject.set("multi_line", Boolean.valueOf(this.event.isArray()));
        newDynamicObject.set("parse_script", this.event.getScriptRemark());
        newDynamicObject.set("parse_script_tag", this.event.getScriptText());
        newDynamicObject.set("msg_digest", this.event.getDataModel().getDigestFormat());
        newDynamicObject.set("charset", this.event.getCharset());
        ConnectorUtil.save(newDynamicObject);
        return genLongId;
    }

    private long findRequiredMqTopic() {
        Connection connection = TX.getConnection("ISCB", true);
        try {
            long l = D.l(DbUtil.executeScalar(connection, "SELECT fid FROM t_iscb_mq_subscriber WHERE fnumber=? AND fmq_server=?", D.asList(new Object[]{this.event.getMQTopicNumber(), Long.valueOf(this.connector.getDbLink())}), D.asList(new Integer[]{12, -5})));
            DbUtil.close(connection, true);
            return l;
        } catch (Throwable th) {
            DbUtil.close(connection, true);
            throw th;
        }
    }
}
