package kd.isc.iscx.platform.core.res.runtime.job.task;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kd.bos.dataentity.resource.ResManager;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.except.IscBizException;
import kd.isc.iscb.util.flow.core.Execution;
import kd.isc.iscb.util.misc.Pair;
import kd.isc.iscx.platform.core.res.meta.dm.AbstractDataModel;
import kd.isc.iscx.platform.core.res.runtime.DataFlowException;
import kd.isc.iscx.platform.core.res.runtime.job.AbstractBatchApplication;
import kd.isc.iscx.platform.core.res.runtime.job.DataStream;
import kd.isc.iscx.platform.core.res.runtime.job.DataTask;

/* loaded from: input_file:kd/isc/iscx/platform/core/res/runtime/job/task/BatchTask.class */
public final class BatchTask extends DataTask {
    private int batchSize;
    private String nodeId;
    private String nodeTitle;
    private Map<Long, Pair<FiberTask, String>> fibers;
    private AbstractBatchApplication app;
    private AbstractDataModel input;
    private AbstractDataModel output;

    public BatchTask(DataStream dataStream, int i, String str) {
        super(dataStream);
        this.fibers = new HashMap();
        this.batchSize = Math.max(1, Math.min(i, dataStream.getDataFlow().getMaxBatchSize()));
        this.nodeId = str;
        this.nodeTitle = dataStream.getFiberFlow().getNode(str).getTitle();
        this.app = (AbstractBatchApplication) dataStream.getFiberFlow().getNode(str).getApplication();
        this.input = this.app.getDataHandler().getInput();
        this.output = this.app.getDataHandler().getOutput();
        setState(DataTask.State.Blocked);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchTask(DataStream dataStream, Map<String, Object> map) {
        super(dataStream, map);
        this.fibers = new HashMap();
        this.batchSize = D.i(map.get("batch_size"));
        this.nodeId = D.s(map.get("node_id"));
        Map map2 = (Map) map.get("fibers");
        this.fibers = new HashMap(map2.size());
        for (Map.Entry entry : map2.entrySet()) {
            Object value = entry.getValue();
            if (value instanceof Map) {
                this.fibers.put(Long.valueOf(D.l(entry.getKey())), new Pair<>((Object) null, (String) ((Map) value).get("value")));
            } else {
                this.fibers.put(Long.valueOf(D.l(entry.getKey())), new Pair<>((Object) null, (String) value));
            }
        }
        this.app = (AbstractBatchApplication) dataStream.getFiberFlow().getNode(this.nodeId).getApplication();
        this.input = this.app.getDataHandler().getInput();
        this.output = this.app.getDataHandler().getOutput();
        compareAndSetState(DataTask.State.Ready, DataTask.State.Blocked);
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    protected void innerToJson(Map<String, Object> map) {
        map.put("batch_size", Integer.valueOf(this.batchSize));
        map.put("node_id", this.nodeId);
        map.put("fibers", this.fibers);
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    public boolean isResumeable() {
        return true;
    }

    public synchronized void appendFiberTask(FiberTask fiberTask, String str) {
        DataTask.State state = getState();
        if (DataTask.State.Blocked != state) {
            throw new IscBizException(String.format(ResManager.loadKDString("当前任务状态（%s）禁止添加子任务。", "BatchTask_6", "isc-iscx-platform-core", new Object[0]), state));
        }
        if (this.fibers.putIfAbsent(Long.valueOf(fiberTask.getId()), new Pair<>(fiberTask, str)) != null) {
            throw new IscBizException(String.format(ResManager.loadKDString("重复添加子任务（%s）", "BatchTask_5", "isc-iscx-platform-core", new Object[0]), Long.valueOf(fiberTask.getId())));
        }
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    public String innerGetContextDigest() {
        String variableName = this.input.getVariableName();
        StringBuilder sb = new StringBuilder(1024);
        sb.append(this.app.getDataHandler().getName()).append(" : ");
        Iterator<Long> it = this.fibers.keySet().iterator();
        while (it.hasNext()) {
            sb.append(this.input.digest((Map) getStream().getFiberTask(it.next()).getFlowRuntime().get(variableName))).append("; ");
        }
        return sb.toString();
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public int getFiberCount() {
        return this.fibers.size();
    }

    public String getNodeId() {
        return this.nodeId;
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    protected void doJob() {
        String variableName = this.input.getVariableName();
        String variableName2 = this.output.getVariableName();
        List<AbstractBatchApplication.Data> prepareBatch = prepareBatch(variableName, variableName2);
        doBatch(prepareBatch);
        setOutputData(prepareBatch, variableName, variableName2);
        resumeBatch(prepareBatch);
        setState(DataTask.State.Success);
    }

    private void doBatch(List<AbstractBatchApplication.Data> list) {
        IscBizException wrapError;
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                this.app.doBatch(getStream(), list);
                getStream().addElapsedTime(this.nodeId, currentTimeMillis);
            } finally {
            }
        } catch (Throwable th) {
            getStream().addElapsedTime(this.nodeId, currentTimeMillis);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private DataFlowException wrapError(Throwable th, List<AbstractBatchApplication.Data> list) {
        if (th instanceof DataFlowException) {
            return (DataFlowException) th;
        }
        return new DataFlowException(this.nodeId, getStream().getFiberFlow().getNode(this.nodeId).getTitle(), this.input, getInputData(list), th);
    }

    private static List<Map<String, Object>> getInputData(List<AbstractBatchApplication.Data> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<AbstractBatchApplication.Data> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getInput());
        }
        return arrayList;
    }

    private void setOutputData(List<AbstractBatchApplication.Data> list, String str, String str2) {
        if (str.equals(str2)) {
            return;
        }
        for (AbstractBatchApplication.Data data : list) {
            data.getTask().getFlowRuntime().getRootExecution().set(str2, data.getOutput());
        }
    }

    private List<AbstractBatchApplication.Data> prepareBatch(String str, String str2) {
        ArrayList arrayList = new ArrayList(this.fibers.size());
        for (Map.Entry<Long, Pair<FiberTask, String>> entry : this.fibers.entrySet()) {
            Pair<FiberTask, String> value = entry.getValue();
            FiberTask fiberTask = (FiberTask) value.getKey();
            if (fiberTask == null) {
                fiberTask = getStream().getFiberTask(entry.getKey());
            }
            Execution rootExecution = fiberTask.getFlowRuntime().getRootExecution();
            arrayList.add(new AbstractBatchApplication.Data(fiberTask, (String) value.getValue(), getInput(rootExecution, str), getOutput(rootExecution, str2)));
        }
        return arrayList;
    }

    private Map<String, Object> getOutput(Execution execution, String str) {
        Map<String, Object> map = (Map) execution.get(str);
        if (map == null) {
            map = this.output.getDataType().m12narrow((Object) Collections.emptyMap());
        }
        return map;
    }

    private Map<String, Object> getInput(Execution execution, String str) {
        Map<String, Object> map = (Map) execution.get(str);
        if (map == null) {
            throw new DataFlowException(this.nodeId, this.nodeTitle, String.format(ResManager.loadKDString("输入数据未就绪，数据模型是：%s", "BatchTask_7", "isc-iscx-platform-core", new Object[0]), this.input.getName()));
        }
        if (map.isEmpty()) {
            throw new DataFlowException(this.nodeId, this.nodeTitle, String.format(ResManager.loadKDString("输入数据为空，数据模型是：%s", "BatchTask_8", "isc-iscx-platform-core", new Object[0]), this.input.getName()));
        }
        return map;
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    public void close() {
    }
}
