package kd.isc.iscx.platform.core.res.runtime;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import kd.bos.dataentity.resource.ResManager;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.except.IscBizException;
import kd.isc.iscb.util.flow.core.Event;
import kd.isc.iscb.util.flow.core.FC;
import kd.isc.iscb.util.flow.core.Flow;
import kd.isc.iscb.util.flow.core.FlowBuilder;
import kd.isc.iscb.util.flow.core.Node;
import kd.isc.iscb.util.flow.core.NodeBuilder;
import kd.isc.iscb.util.flow.core.Transition;
import kd.isc.iscb.util.g.Graph;
import kd.isc.iscx.platform.core.res.ResourceUtil;
import kd.isc.iscx.platform.core.res.meta.SystemInfo;
import kd.isc.iscx.platform.core.res.meta.dm.AbstractDataModel;
import kd.isc.iscx.platform.core.res.meta.event.AbstractEventModel;
import kd.isc.iscx.platform.core.res.runtime.job.AbstractBatchApplication;
import kd.isc.iscx.platform.core.res.runtime.job.DataFlowNodeExecutionSync;
import kd.isc.iscx.platform.core.res.runtime.job.task.FiberNodeCheckCancelSignalListener;
import kd.isc.iscx.platform.core.res.runtime.job.task.FiberNodeCompletedListener;
import kd.isc.iscx.platform.core.res.runtime.job.task.FiberNodeResumeListener;
import kd.isc.iscx.platform.core.res.runtime.job.task.FiberNodeSuspendListener;
import kd.isc.iscx.platform.core.res.runtime.job.task.FiberRuntimeCompletedListener;
import kd.isc.iscx.platform.core.res.runtime.job.task.FiberRuntimeTerminatedListener;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:kd/isc/iscx/platform/core/res/runtime/DataFlowParser.class */
public class DataFlowParser {
    DataFlowParser() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Flow parse(DataFlowDefine dataFlowDefine, Map<String, Object> map) {
        FlowBuilder newInstance = FC.getFactory().newInstance(String.valueOf(dataFlowDefine.getId()), dataFlowDefine.getName() + "(" + dataFlowDefine.getNumber() + ")");
        buildNodes(map, newInstance, dataFlowDefine);
        buildTransition(map, newInstance);
        setNodesFeature(map, newInstance);
        buildDataStreamTrigger(dataFlowDefine, newInstance);
        newInstance.getRootBuilder().listener(Event.ON_TERMINATED, FiberRuntimeTerminatedListener.L);
        Flow end = newInstance.end();
        checkPathCycle(end);
        checkConcurrentBatchTask(end);
        return end;
    }

    private static void setNodesFeature(Map<String, Object> map, FlowBuilder flowBuilder) {
        Iterator it = ((List) ResourceUtil.getValue(map, "define", "nodes")).iterator();
        while (it.hasNext()) {
            NodeBuilder node = flowBuilder.getNode(D.s(((Map) it.next()).get("id")));
            if (node.inComingsCount() > 1) {
                node.smartMerge();
            }
            if (node.outGoingsCount() > 1) {
                node.andSplit();
            }
        }
    }

    private static void buildNodes(Map<String, Object> map, FlowBuilder flowBuilder, DataFlowDefine dataFlowDefine) {
        for (Map map2 : (List) ResourceUtil.getValue(map, "define", "nodes")) {
            NodeBuilder node = flowBuilder.node(D.s(map2.get("id")), D.s(map2.get("name")));
            Object value = ResourceUtil.getValue(map2, "details", "resource", "id");
            if (value != null) {
                Object resource = ResourceUtil.getResource(D.l(value));
                if (resource instanceof DataHandler) {
                    buildNode(dataFlowDefine, map2, node, (DataHandler) resource);
                } else {
                    if (!(resource instanceof AbstractEventModel) || !node.getId().equals("1")) {
                        throw new IscBizException(String.format(ResManager.loadKDString("资源“%1$s”没有实现 ForDataFlow 接口，不能用于节点“%2$s”。", "DataFlowParser_13", "isc-iscx-platform-core", new Object[0]), resource, node.getTitle()));
                    }
                    buildStartNode(flowBuilder, node, dataFlowDefine, (AbstractEventModel) resource, map2);
                }
            }
            if (node.getApplication() == null) {
                node.listener(Event.ON_COMPLETED, FiberNodeCompletedListener.L);
            }
            node.listener(Event.ON_SUSPENED, FiberNodeSuspendListener.L);
            node.listener(Event.ON_RESUMED, FiberNodeResumeListener.L);
            node.setSynchronizer(DataFlowNodeExecutionSync.IMPL);
            if ("End".equals(map2.get("type"))) {
                node.listener(Event.ON_COMPLETED, FiberRuntimeCompletedListener.L);
                node.smartMerge();
            }
        }
    }

    private static void buildStartNode(FlowBuilder flowBuilder, NodeBuilder nodeBuilder, DataFlowDefine dataFlowDefine, AbstractEventModel abstractEventModel, Map<String, Object> map) {
        createVariable(nodeBuilder.getFlowBuilder(), abstractEventModel.getOutput());
        flowBuilder.setAttribute("EVENT_CONNECTOR", getEventConnector(dataFlowDefine, abstractEventModel, map));
        flowBuilder.setAttribute("EVENT_MODEL", abstractEventModel);
        flowBuilder.setAttribute("EVENT_NODE_ID", nodeBuilder.getId());
    }

    private static Connector getEventConnector(DataFlowDefine dataFlowDefine, AbstractEventModel abstractEventModel, Map<String, Object> map) {
        Map<String, Connector> connectors = getConnectors(dataFlowDefine, map);
        SystemInfo systemInfo = abstractEventModel.getSystemInfo();
        if (systemInfo == null) {
            return null;
        }
        return connectors.get(systemInfo.getNumber());
    }

    private static void buildDataStreamTrigger(DataFlowDefine dataFlowDefine, FlowBuilder flowBuilder) {
        AbstractEventModel eventModel = dataFlowDefine.getEventModel();
        if (!eventModel.equals(flowBuilder.getAttribute("EVENT_MODEL"))) {
            throw new IscBizException(ResManager.loadKDString("数据流开始节点的事件模型与启动方案的事件模型不一致。", "DataFlowParser_2", "isc-iscx-platform-core", new Object[0]));
        }
        dataFlowDefine.initTrigger((Connector) flowBuilder.getAttribute("EVENT_CONNECTOR"), (String) flowBuilder.getAttribute("EVENT_NODE_ID"));
        createVariable(flowBuilder, eventModel.getOutput());
    }

    private static void buildNode(DataFlowDefine dataFlowDefine, Map<String, Object> map, NodeBuilder nodeBuilder, DataHandler dataHandler) {
        createVariable(nodeBuilder.getFlowBuilder(), dataHandler.getInput());
        createVariable(nodeBuilder.getFlowBuilder(), dataHandler.getOutput());
        dataHandler.build(nodeBuilder, (Map) map.get("details"), getConnectors(dataFlowDefine, map), D.x(map.get("batch_mode")));
        checkBatchMode(map, dataHandler, nodeBuilder);
        nodeBuilder.listener(Event.ON_READY, FiberNodeCheckCancelSignalListener.L);
        if (!dataHandler.isFiberTerminal()) {
            nodeBuilder.listener(Event.ON_COMPLETED, FiberNodeCompletedListener.L);
        }
        nodeBuilder.setAttribute("is_terminal", Boolean.valueOf(dataHandler.isFiberTerminal()));
    }

    private static void checkBatchMode(Map<String, Object> map, DataHandler dataHandler, NodeBuilder nodeBuilder) {
        if (!(dataHandler instanceof BatchSupportable)) {
            checkNodeForNotBatchMode(nodeBuilder);
        } else if (D.x(map.get("batch_mode"))) {
            checkNodeForBatchMode(nodeBuilder);
        } else {
            checkNodeForNotBatchMode(nodeBuilder);
        }
    }

    private static void checkNodeForNotBatchMode(NodeBuilder nodeBuilder) {
        if (nodeBuilder.getApplication() instanceof AbstractBatchApplication) {
            throw new IscBizException(String.format(ResManager.loadKDString("节点“%s”没有启用批处理模式，但它的 application 继承了 AbstractBatchApplication。", "DataFlowParser_14", "isc-iscx-platform-core", new Object[0]), nodeBuilder));
        }
        if (nodeBuilder.getCallback() != null) {
            throw new IscBizException(String.format(ResManager.loadKDString("节点“%s”没有启用了批处理模式，但它设置了 callback。", "DataFlowParser_15", "isc-iscx-platform-core", new Object[0]), nodeBuilder));
        }
    }

    private static void checkNodeForBatchMode(NodeBuilder nodeBuilder) {
        if (!(nodeBuilder.getApplication() instanceof AbstractBatchApplication)) {
            throw new IscBizException(String.format(ResManager.loadKDString("节点“%s”启用了批处理模式，但它的 application 没有继承 AbstractBatchApplication。", "DataFlowParser_16", "isc-iscx-platform-core", new Object[0]), nodeBuilder));
        }
        if (nodeBuilder.getCallback() == null) {
            throw new IscBizException(String.format(ResManager.loadKDString("节点“%s”启用了批处理模式，但它没有设置 callback，将导致数据线任务无法与批处理任务正确交互。", "DataFlowParser_17", "isc-iscx-platform-core", new Object[0]), nodeBuilder));
        }
        if (nodeBuilder.getCallback() != BatchSupportable.WAIT_FOR_BATCH_TASK) {
            throw new IscBizException(String.format(ResManager.loadKDString("节点“%s”启用了批处理模式，但它设置的 callback 不是 BatchSupportable.WAIT_FOR_BATCH_TASK。", "DataFlowParser_18", "isc-iscx-platform-core", new Object[0]), nodeBuilder));
        }
    }

    private static void createVariable(FlowBuilder flowBuilder, AbstractDataModel abstractDataModel) {
        String variableName = abstractDataModel.getVariableName();
        if (flowBuilder.getVariable(variableName) == null) {
            flowBuilder.variable(variableName, variableName, abstractDataModel.getName(), abstractDataModel.getDataType()).setAttribute("data_model", abstractDataModel);
        }
    }

    private static Map<String, Connector> getConnectors(DataFlowDefine dataFlowDefine, Map<String, Object> map) {
        List<Map> list = (List) ResourceUtil.getValue(map, "details", "connctor_binding");
        if (list == null) {
            return Collections.emptyMap();
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap(list.size());
        for (Map map2 : list) {
            String s = D.s(map2.get("system_number"));
            String s2 = D.s(map2.get("system_connector"));
            if (s2 == null) {
                throw new IllegalArgumentException(String.format(ResManager.loadKDString("节点“%s”没有指定连接器。", "DataFlowParser_19", "isc-iscx-platform-core", new Object[0]), map.get("name")));
            }
            linkedHashMap.put(s, dataFlowDefine.getConnector(s2));
        }
        return linkedHashMap;
    }

    private static void buildTransition(Map<String, Object> map, FlowBuilder flowBuilder) {
        List<Map> list = (List) ResourceUtil.getValue(map, "define", "edges");
        HashSet hashSet = new HashSet(list.size());
        for (Map map2 : list) {
            String s = D.s(map2.get("src_node_id"));
            String s2 = D.s(map2.get("dst_node_id"));
            String str = s + "-" + s2;
            if (hashSet.add(str)) {
                flowBuilder.transition(str, str, s, s2, Transition.Type.NORMAL);
            }
        }
    }

    private static void checkPathCycle(Flow flow) {
        Graph graph = new Graph();
        for (Node node : flow.getRoot().getChildren()) {
            graph.appendVertex(node.getId(), node.getId());
        }
        Iterator it = flow.getRoot().getChildren().iterator();
        while (it.hasNext()) {
            for (Transition transition : ((Node) it.next()).getOutGoing()) {
                if (!D.x(transition.getSource().getAttribute("is_terminal"))) {
                    graph.appendEdge(transition.getSource().getId(), transition.getTarget().getId(), transition);
                }
            }
        }
        if (graph.topSort().size() != flow.getRoot().getChildren().size()) {
            throw new IscBizException(ResManager.loadKDString("数据流图中存在环，不可启用。", "DataFlowParser_10", "isc-iscx-platform-core", new Object[0]));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int calcTerminalNodeCount(Flow flow) {
        int i = 0;
        Iterator it = flow.getRoot().getChildren().iterator();
        while (it.hasNext()) {
            if (D.x(((Node) it.next()).getAttribute("is_terminal"))) {
                i++;
            }
        }
        return i;
    }

    private static void checkConcurrentBatchTask(Flow flow) {
        ArrayList arrayList = new ArrayList();
        for (Node node : flow.getRoot().getChildren()) {
            if (node.getApplication() instanceof AbstractBatchApplication) {
                arrayList.add(node);
            }
        }
        if (arrayList.size() <= 1) {
            return;
        }
        kd.isc.iscb.util.flow.core.i.arithmetic.Graph graph = flow.getRoot().getGraph();
        Node[] nodeArr = (Node[]) arrayList.toArray(new Node[arrayList.size()]);
        int length = nodeArr.length - 1;
        for (int i = 0; i < length; i++) {
            for (int i2 = 1; i2 < nodeArr.length; i2++) {
                Node node2 = nodeArr[i];
                Node node3 = nodeArr[i2];
                if (!isReachable(graph, node2, node3)) {
                    throw new IscBizException(String.format(ResManager.loadKDString("节点“%1$s”与节点“%2$s”之间的执行顺序不确定，请在二者之间添加连线确定执行的先后顺序。", "DataFlowParser_20", "isc-iscx-platform-core", new Object[0]), node2, node3));
                }
            }
        }
    }

    private static boolean isReachable(kd.isc.iscb.util.flow.core.i.arithmetic.Graph graph, Node node, Node node2) {
        return graph.reach(node.getId(), node2.getId()) || graph.reach(node2.getId(), node.getId());
    }
}
