package kd.isc.iscx.platform.core.res.runtime.job.task;

import java.util.Map;
import java.util.zip.CRC32;
import kd.bos.dataentity.resource.ResManager;
import kd.isc.iscb.platform.core.cache.CacheableObjectManager;
import kd.isc.iscb.platform.core.connector.ConnectionWrapper;
import kd.isc.iscb.platform.core.task.SignalManager;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.except.IscBizException;
import kd.isc.iscb.util.flow.core.Flow;
import kd.isc.iscb.util.io.ObjectReader;
import kd.isc.iscx.platform.core.res.meta.Resource;
import kd.isc.iscx.platform.core.res.meta.dp.AbstractDataQuery;
import kd.isc.iscx.platform.core.res.runtime.Connector;
import kd.isc.iscx.platform.core.res.runtime.DataFlowDefine;
import kd.isc.iscx.platform.core.res.runtime.DataFlowException;
import kd.isc.iscx.platform.core.res.runtime.job.DataStream;
import kd.isc.iscx.platform.core.res.runtime.job.DataTask;

/* loaded from: input_file:kd/isc/iscx/platform/core/res/runtime/job/task/StreamTask.class */
public final class StreamTask extends DataTask implements Comparable<StreamTask> {
    private Flow fiberFlow;
    private AbstractDataQuery query;
    private String startNodeId;
    private Map<String, Object> params;
    private Connector connector;
    private String[] crcFields;
    private int index;
    private int total;
    private CRC32 crc;
    private transient long targetCRC;
    private transient int targetIndex;
    private transient ObjectReader<? extends Map<String, Object>> reader;
    private transient ConnectionWrapper cn;
    private transient int bufferSize;
    private String readerSummery;

    public StreamTask(DataStream dataStream, String str, AbstractDataQuery abstractDataQuery, Connector connector, Map<String, Object> map) {
        super(dataStream);
        initBufferSize();
        this.fiberFlow = dataStream.getFiberFlow();
        this.startNodeId = str;
        this.query = abstractDataQuery;
        this.params = map;
        this.connector = connector;
        this.crcFields = abstractDataQuery.getCrcFields();
        this.index = 0;
        this.targetIndex = 0;
        this.crc = new CRC32();
        this.targetCRC = this.crc.getValue();
        this.total = Integer.MIN_VALUE;
    }

    private void initBufferSize() {
        DataFlowDefine dataFlow = getStream().getDataFlow();
        this.bufferSize = Math.max((dataFlow.getWorkAreaSize() / (dataFlow.getTerminalNodeCount() + 1)) - 1, 1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamTask(DataStream dataStream, Map<String, Object> map) {
        super(dataStream, map);
        initBufferSize();
        this.fiberFlow = dataStream.getFiberFlow();
        this.startNodeId = D.s(map.get("startNodeId"));
        this.query = (AbstractDataQuery) CacheableObjectManager.get(Resource.class, Long.valueOf(D.l(map.get("query"))));
        this.params = (Map) map.get("params");
        this.connector = Connector.get(D.l(map.get("connector")));
        this.crcFields = this.query.getCrcFields();
        this.index = 0;
        this.targetIndex = D.i(map.get("index"));
        this.crc = new CRC32();
        this.targetCRC = D.l(map.get("crc"));
        this.total = D.i(map.get("total"));
        this.readerSummery = (String) map.get("reader");
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    protected void innerToJson(Map<String, Object> map) {
        map.put("startNodeId", this.startNodeId);
        map.put("query", Long.valueOf(this.query.getId()));
        map.put("params", this.params);
        map.put("connector", Long.valueOf(this.connector.getId()));
        map.put("index", Integer.valueOf(this.targetIndex));
        map.put("total", Integer.valueOf(this.total));
        map.put("crc", Long.valueOf(this.targetCRC));
        map.put("reader", this.readerSummery);
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    public boolean isResumeable() {
        return !isBufferFull();
    }

    private boolean isBufferFull() {
        return getStream().isWorkAreaFull() || getStream().fiberTaskCount(this.startNodeId) >= this.bufferSize;
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    protected void doJob() {
        if (isBufferFull()) {
            compareAndSetState(DataTask.State.Blocked, DataTask.State.Running);
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                prepareReader();
                doRead();
                getStream().getNodeCounter(this.startNodeId).add(System.currentTimeMillis() - currentTimeMillis);
            } catch (Throwable th) {
                throw new DataFlowException(this.startNodeId, this.fiberFlow.getNode(this.startNodeId).getTitle(), this.query.getInput(), this.params, th);
            }
        } catch (Throwable th2) {
            getStream().getNodeCounter(this.startNodeId).add(System.currentTimeMillis() - currentTimeMillis);
            throw th2;
        }
    }

    private void doRead() {
        while (!isBufferFull()) {
            SignalManager.checkCancelSignal();
            Map<String, Object> readRow = readRow();
            if (readRow == null) {
                setState(DataTask.State.Success);
                return;
            } else {
                super.enqueue(createFiberTask(readRow));
                refreshCounter();
            }
        }
    }

    private void refreshCounter() {
        this.targetIndex = this.index;
        this.targetCRC = this.crc.getValue();
        if (this.index > this.total) {
            this.total = this.index;
            getStream().getCounter().incTotalCount();
        }
    }

    private FiberTask createFiberTask(Map<String, Object> map) {
        return new FiberTask(getStream(), this.startNodeId, this.query.getOutput(), map);
    }

    private Map<String, Object> readRow() {
        Map<String, Object> map = (Map) this.reader.read();
        if (map == null) {
            return null;
        }
        this.index++;
        for (String str : this.crcFields) {
            Object obj = map.get(str);
            this.crc.update((obj == null ? "null" : obj.toString()).getBytes(D.UTF_8));
        }
        return map;
    }

    private void prepareReader() {
        if (this.reader == null) {
            this.cn = this.connector.getConnection();
            this.reader = this.query.invoke(getStream(), this.cn, this.query.getInput().getDataType().m12narrow((Object) this.params), this.bufferSize);
            this.readerSummery = this.reader.toString();
            int totalCount = this.reader.getTotalCount();
            if (totalCount >= 0) {
                if (this.total < 0) {
                    this.total = totalCount;
                    getStream().getCounter().incTotalCount(totalCount);
                } else if (this.total != totalCount) {
                    throw new IscBizException(ResManager.loadKDString("由于源系统端的数据总行数在上次执行后发生了变化，当前数据流任务无法恢复，不能重试。", "StreamTask_0", "isc-iscx-platform-core", new Object[0]));
                }
            }
            while (this.index < this.targetIndex && readRow() != null) {
            }
            if (this.crc.getValue() != this.targetCRC) {
                throw new IscBizException(ResManager.loadKDString("由于源系统端的数据内容在上次执行后发生了变化，当前数据流任务无法恢复，不能重试。", "StreamTask_1", "isc-iscx-platform-core", new Object[0]));
            }
        }
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    public void close() {
        try {
            ObjectReader<? extends Map<String, Object>> objectReader = this.reader;
            this.reader = null;
            if (objectReader != null) {
                objectReader.close();
            }
        } finally {
            ConnectionWrapper connectionWrapper = this.cn;
            this.cn = null;
            if (connectionWrapper != null) {
                connectionWrapper.close();
            }
        }
    }

    @Override // kd.isc.iscx.platform.core.res.runtime.job.DataTask
    public String innerGetContextDigest() {
        return this.query.getName() + " : " + this.query.getInput().digest(this.params);
    }

    @Override // java.lang.Comparable
    public int compareTo(StreamTask streamTask) {
        return Long.compare(streamTask.getId(), getId());
    }
}
