package kd.isc.iscx.platform.core.res.runtime.job;

import java.sql.Connection;
import java.sql.Timestamp;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.db.tx.TX;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.servicehelper.BusinessDataServiceHelper;
import kd.isc.iscb.platform.core.IscRuntimeInfo;
import kd.isc.iscb.platform.core.dc.e.DataCopyRunner;
import kd.isc.iscb.platform.core.dc.e.MQueueWriter;
import kd.isc.iscb.platform.core.job.Job;
import kd.isc.iscb.platform.core.job.JobEngine;
import kd.isc.iscb.platform.core.job.JobFactory;
import kd.isc.iscb.platform.core.task.ScheduleManager;
import kd.isc.iscb.platform.core.task.SignalManager;
import kd.isc.iscb.platform.core.task.Task;
import kd.isc.iscb.util.db.DbUtil;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.except.IscBizException;
import kd.isc.iscb.util.except.TaskCancelException;
import kd.isc.iscb.util.misc.StringUtil;
import kd.isc.iscb.util.script.context.Context;
import kd.isc.iscx.platform.core.res.meta.ds.NoticeSend;

/* loaded from: input_file:kd/isc/iscx/platform/core/res/runtime/job/DataStreamJob.class */
public final class DataStreamJob implements Job {
    private long id;
    private String number;
    private long mutex;
    private boolean isLight;
    private volatile DataStream stream;
    private Timestamp start_time;
    private static Log logger = LogFactory.getLog(DataStreamJob.class);
    private AtomicInteger threadCount = new AtomicInteger(0);

    public DataStreamJob(long j, String str, long j2, boolean z) {
        this.id = j;
        this.number = str;
        this.mutex = j2;
        this.isLight = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStreamJob(String str, String str2) {
        this.number = str;
        String[] split = str2.split(",");
        this.id = D.l(split[0]);
        this.mutex = D.l(split[1]);
        this.isLight = D.x(split[2]);
    }

    public String getParam() {
        return this.id + "," + this.mutex + "," + this.isLight;
    }

    public String getTitle() {
        return this.number;
    }

    public boolean isLightTask() {
        return this.isLight;
    }

    public Date getStartTime() {
        return this.start_time;
    }

    public JobFactory getFactory() {
        return new DataStreamJobFactory();
    }

    public long getOwnerId() {
        return this.id;
    }

    public long getMutex() {
        return this.mutex;
    }

    public DataStream getStream() {
        return this.stream;
    }

    public Job.Progress getRealtimeProgress() {
        if (this.stream == null) {
            return null;
        }
        int totalCount = this.stream.getCounter().getTotalCount();
        int completedCount = this.stream.getCounter().getCompletedCount();
        int ommittedCount = this.stream.getCounter().getOmmittedCount();
        int failedCount = this.stream.getCounter().getFailedCount();
        return new Job.Progress(totalCount, completedCount + failedCount + ommittedCount, String.format(ResManager.loadKDString("%1$s; 工作区:%2$s, 线程数：%3$s", "DataStreamJob_3", "isc-iscx-platform-core", new Object[0]), this.stream.getContextDigest(), Integer.valueOf(this.stream.getWorkArea().dataJobsCount()), this.threadCount + ", " + Math.max(1L, (System.currentTimeMillis() - this.start_time.getTime()) / 1000)));
    }

    public String getJobSummary() {
        if (this.stream == null) {
            return "";
        }
        String contextDigest = this.stream.getContextDigest();
        if (this.stream.getWorkArea().isFull()) {
            contextDigest = (contextDigest + ", ") + ResManager.loadKDString("工作区: 满载", "DataStreamJob_2", "isc-iscx-platform-core", new Object[0]);
        }
        return contextDigest.trim();
    }

    public void run() {
        if (setRunning()) {
            try {
                execute();
            } finally {
                handleRetry();
            }
        }
    }

    public void executeWithoutRetry() {
        if (!setRunning()) {
            throw new IscBizException(String.format(ResManager.loadKDString("数据流（%s）正在执行中或已结束。", "DataStreamJob_4", "isc-iscx-platform-core", new Object[0]), this.number));
        }
        execute();
    }

    private void execute() {
        try {
            try {
                publishStartMessage();
                IscRuntimeInfo.get().incDataFlowExecuteTotalCount();
                tryDeleteLogs();
                SignalManager.registerTask(D.s(Long.valueOf(this.id)));
                ScheduleManager.submit(new DataStreamProgress(this), this.stream.getDataFlow().getCheckPoint());
                doMainTask();
                SignalManager.unregisterTask(D.s(Long.valueOf(this.id)));
                waitForSubTasks();
                updateStateAndDisposeTask();
                publishEndMessage();
                sendDataStreamNotice();
            } catch (TaskCancelException e) {
                this.stream.getWorkArea().terminate();
                throw e;
            }
        } catch (Throwable th) {
            SignalManager.unregisterTask(D.s(Long.valueOf(this.id)));
            waitForSubTasks();
            updateStateAndDisposeTask();
            publishEndMessage();
            sendDataStreamNotice();
            throw th;
        }
    }

    private void handleRetry() {
        int executeCount;
        if (DataStreamState.F != this.stream.getWorkArea().getDataStreamState() || (executeCount = this.stream.getExecuteCount() - 1) >= this.stream.getDataFlow().getMaxRetryTimes()) {
            return;
        }
        JobEngine.submit(new DataStreamJob(this.id, this.number, this.mutex, this.isLight), new Timestamp(System.currentTimeMillis() + (this.stream.getDataFlow().getRetryInterval(executeCount) * 60000)));
    }

    private void publishStartMessage() {
        try {
            publishMsg(this.stream.getDataFlow().getStartMqTopic(), "IscxDataFlowEventStart");
        } catch (Exception e) {
            logger.warn("数据流启动方案执行时通知mq发生错误,启动方案id:" + this.stream.getDataFlow().getDataFlowTriggerId(), e);
        }
    }

    private void publishEndMessage() {
        try {
            publishMsg(this.stream.getDataFlow().getEndMqTopic(), "IscxDataFlowEventEnd");
        } catch (Exception e) {
            logger.warn("数据流启动方案结束时回调mq发生错误,启动方案id:" + this.stream.getDataFlow().getDataFlowTriggerId(), e);
        }
    }

    private void publishMsg(long j, String str) {
        if (j != 0) {
            TriggerMessageParam triggerMessageParam = new TriggerMessageParam(j, 1);
            MQueueWriter mQueueWriter = null;
            try {
                Map<String, Object> buildMsg = buildMsg(str);
                mQueueWriter = new MQueueWriter(triggerMessageParam);
                mQueueWriter.write(buildMsg);
                if (mQueueWriter != null) {
                    mQueueWriter.close();
                }
            } catch (Throwable th) {
                if (mQueueWriter != null) {
                    mQueueWriter.close();
                }
                throw th;
            }
        }
    }

    private Map<String, Object> buildMsg(String str) {
        Context context = this.stream.getContext();
        HashMap hashMap = new HashMap(16);
        hashMap.put("stream_id", context.get("stream_id"));
        hashMap.put("stream_number", context.get("stream_number"));
        hashMap.put("trigger_id", context.get("trigger_id"));
        hashMap.put("trigger_number", context.get("trigger_number"));
        hashMap.put("state", context.get("state"));
        hashMap.put("total", context.get("total"));
        hashMap.put("success", context.get("success"));
        hashMap.put("terminated", context.get("terminated"));
        hashMap.put("ommitted", context.get("ommitted"));
        hashMap.put("start_time", context.get("start_time"));
        hashMap.put("params", context.get("params"));
        hashMap.put("execute_count", context.get("execute_count"));
        hashMap.put("last_error", StringUtil.getCascadeMessage((Throwable) context.get("last_error")));
        hashMap.put("type", str);
        return hashMap;
    }

    private void sendDataStreamNotice() {
        NoticeSend successNotice;
        DataStreamState dataStreamState = this.stream.getWorkArea().getDataStreamState();
        if (DataStreamState.F == dataStreamState) {
            NoticeSend failedNotice = this.stream.getDataFlow().getFailedNotice();
            if (failedNotice != null) {
                failedNotice.send(createContext(), NoticeSend.NoticeType.warning);
                return;
            }
            return;
        }
        if (DataStreamState.S != dataStreamState || (successNotice = this.stream.getDataFlow().getSuccessNotice()) == null) {
            return;
        }
        successNotice.send(createContext(), NoticeSend.NoticeType.message);
    }

    private Map<String, Object> createContext() {
        HashMap hashMap = new HashMap();
        hashMap.put("$data", this.stream.getParams());
        hashMap.put("$flow", this.stream.getFiberFlow());
        hashMap.put("$stream", this.stream.getContext());
        return hashMap;
    }

    private void doMainTask() {
        while (this.stream.getWorkArea().resetRunning()) {
            try {
                doTask();
            } catch (Throwable th) {
                handleUnexpectedError(th);
            }
        }
    }

    private void handleUnexpectedError(Throwable th) {
        if (th instanceof TaskCancelException) {
            throw ((TaskCancelException) th);
        }
        Util.saveErrorLog(this.stream, th);
    }

    private void waitForSubTasks() {
        while (this.stream.getWorkArea().getCurrentThreads() > 1) {
            D.sleep(20L);
        }
    }

    private void tryDeleteLogs() {
        if (D.i(Integer.valueOf(this.stream.getExecuteCount())) > 1) {
            Connection connection = TX.getConnection("ISCB", false);
            try {
                DbUtil.executeUpdate(connection, "DELETE FROM t_iscx_data_stream_log WHERE fdata_stream=?", D.asList(new Object[]{Long.valueOf(this.id)}), D.asList(new Integer[]{-5}));
            } finally {
                DbUtil.close(connection, true);
            }
        }
    }

    private void updateStateAndDisposeTask() {
        try {
            this.stream.getWorkArea().dispose();
        } finally {
            DataStreamState.update(this.stream);
        }
    }

    private boolean setRunning() {
        boolean running = DataStreamState.setRunning(this.id);
        if (running) {
            this.start_time = new Timestamp(System.currentTimeMillis());
            this.stream = new DataStream(BusinessDataServiceHelper.loadSingle(Long.valueOf(this.id), "iscx_data_stream"), this);
            this.stream.getWorkArea().setRunning();
        }
        return running;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doTask() {
        this.threadCount.incrementAndGet();
        try {
            WorkArea workArea = this.stream.getWorkArea();
            int i = 0;
            while (true) {
                DataTask dequeue = workArea.dequeue();
                if (dequeue != null) {
                    try {
                        i = 0;
                        dequeue.run();
                        workArea.release(dequeue);
                    } finally {
                    }
                } else {
                    i++;
                    if (i > 5) {
                        return;
                    } else {
                        D.sleep(1L);
                    }
                }
            }
        } finally {
            this.threadCount.decrementAndGet();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startSubTaskThread() {
        final List currentTasks = SignalManager.currentTasks();
        DataCopyRunner.submitDataStreamTask(new Task() { // from class: kd.isc.iscx.platform.core.res.runtime.job.DataStreamJob.1
            private String id = UUID.randomUUID().toString();

            public String getId() {
                return this.id;
            }

            public void run() {
                try {
                    SignalManager.registerOnSubTaskBegin(currentTasks);
                    try {
                        DataStreamJob.this.doTask();
                        SignalManager.unregisterOnSubTaskEnd();
                    } finally {
                    }
                } finally {
                    DataStreamJob.this.stream.getWorkArea().decCurrentThreads();
                }
            }
        });
    }
}
