package com.kingdee.bos.qing.dpp.client.dataset;

import com.kingdee.bos.qing.dpp.client.exception.DataSetReadException;
import com.kingdee.bos.qing.dpp.client.job.JobCanceler;
import com.kingdee.bos.qing.dpp.common.interfaces.ISocketSinkDataReceiver;
import com.kingdee.bos.qing.dpp.common.log.DppLogger;
import com.kingdee.bos.qing.dpp.common.options.QDppOptions;
import com.kingdee.bos.qing.dpp.job.exception.DataSinkException;
import com.kingdee.bos.qing.dpp.job.model.QDppJobResult;
import com.kingdee.bos.qing.dpp.job.model.QDppJobStatus;
import com.kingdee.bos.qing.dpp.job.model.SocketBatchSinkDatas;
import com.kingdee.bos.qing.dpp.rpc.ServiceRefCenter;
import com.kingdee.bos.qing.dpp.utils.DppGlobalScheduleExecutor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/client/dataset/DppSocketDataSet.class */
public class DppSocketDataSet extends DppDataSet implements ISocketSinkDataReceiver {
    private static final int RESET_OPERATION = 1;
    private static final int READ_NEXT_OPERATION = 2;
    protected long readDataTimeout;
    private static final int MAX_DATA_RECEIVE_QUEUE_SIZE = ((Integer) QDppOptions.DATASET_SOCKET_MAX_QUEUE_SIZE.getValue()).intValue();
    private static final Logger log = new DppLogger("Qing-Dpp:", LoggerFactory.getLogger(DppSocketDataSet.class));
    private String refId = UUID.randomUUID().toString();
    protected volatile boolean closed = false;
    protected volatile boolean jobEnded = false;
    protected AtomicLong readRowIndex = new AtomicLong(0);
    protected AtomicLong receivedTotalRowSize = new AtomicLong(0);
    protected AtomicInteger readSegmentIndex = new AtomicInteger(-1);
    protected AtomicInteger writeSegmentIndex = new AtomicInteger(-1);
    protected Object notEmpty = new Object();
    protected List<DataSegment> dataSegmentList = Collections.synchronizedList(new ArrayList());
    private Set<Long> receivedBatchSeqSet = new HashSet(10);
    private long nextBatchSeq = 1;
    private long currentMaxReceivedSeq = -1;
    private Map<Long, SocketBatchSinkDatas> unOrderSinkDatas = new HashMap(10);
    private AtomicInteger operator = new AtomicInteger(-1);

    public DppSocketDataSet(long j) {
        this.readDataTimeout = j;
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DppDataSet
    protected void openInternalOnJobSucceed() throws DataSetReadException {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setFinalTotalRowCount(long j) {
        this.jobResultRef.get().setTotalProducedRowCount(j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSegmentWriteFinished() {
        int i;
        DataSegment dataSegment;
        this.jobEnded = isFinished(this.jobResultRef.get().getJobStatus());
        if (!this.jobEnded || (i = this.writeSegmentIndex.get()) <= -1 || null == (dataSegment = this.dataSegmentList.get(i)) || dataSegment.isWriteFinished()) {
            return;
        }
        try {
            dataSegment.setLastSegment(true);
            dataSegment.setWriteFinished(true);
        } catch (DataSinkException e) {
            log.error("", e);
        }
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DppDataSet
    protected boolean isDataExist() throws DataSetReadException {
        checkException();
        if (this.closed) {
            return false;
        }
        return !this.jobEnded || this.readRowIndex.get() < this.receivedTotalRowSize.get();
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DppDataSet
    protected boolean isDataSetUsable(QDppJobResult qDppJobResult) {
        return qDppJobResult.getJobStatus() == QDppJobStatus.DATA_READY || qDppJobResult.getJobStatus().isEndState();
    }

    private void checkException() throws DataSetReadException {
        String error = this.jobResultRef.get().getError();
        if (null != error) {
            throw new DataSetReadException("job result read error:" + error);
        }
    }

    private void waitToReadNext() {
        if (this.operator.get() == READ_NEXT_OPERATION) {
            return;
        }
        while (!this.operator.compareAndSet(-1, READ_NEXT_OPERATION)) {
            try {
                if (this.closed) {
                    throw new IllegalStateException("data set is already closed");
                }
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DppDataSet
    public DppRowData nextRow() throws DataSetReadException {
        waitToReadNext();
        int i = this.readSegmentIndex.get();
        if (i < 0) {
            waitNextSegmentDataReady(0);
            if (isAllReaded()) {
                return null;
            }
            return blockReadData(this.dataSegmentList.get(this.readSegmentIndex.incrementAndGet()));
        }
        DppRowData blockReadData = blockReadData(this.dataSegmentList.get(i));
        if (null != blockReadData) {
            return blockReadData;
        }
        waitNextSegmentDataReady(i + RESET_OPERATION);
        if (isAllReaded()) {
            return null;
        }
        return blockReadData(this.dataSegmentList.get(this.readSegmentIndex.incrementAndGet()));
    }

    private DppRowData blockReadData(DataSegment dataSegment) throws DataSetReadException {
        if (!dataSegment.isEmpty()) {
            DppRowData readRow = dataSegment.readRow();
            if (null != readRow) {
                this.readRowIndex.incrementAndGet();
            }
            return readRow;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!dataSegment.isWriteFinished()) {
            DppRowData readRow2 = dataSegment.readRow();
            if (null != readRow2) {
                this.readRowIndex.incrementAndGet();
                return readRow2;
            }
            if (System.currentTimeMillis() - currentTimeMillis >= this.readDataTimeout) {
                throw new DataSetReadException("read data set timeout,jobName:" + getJobName());
            }
        }
        return null;
    }

    private boolean isAllReaded() {
        return this.jobEnded && this.readRowIndex.get() == this.receivedTotalRowSize.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyDataReady() {
        synchronized (this.notEmpty) {
            this.notEmpty.notifyAll();
        }
    }

    private void waitNextSegmentDataReady(int i) throws DataSetReadException {
        int i2 = this.writeSegmentIndex.get();
        synchronized (this.notEmpty) {
            long currentTimeMillis = System.currentTimeMillis();
            while (i2 < i) {
                if (this.jobEnded) {
                    return;
                }
                if (this.closed) {
                    throw new DataSetReadException("dataSet is already closed");
                }
                if (System.currentTimeMillis() - currentTimeMillis >= this.readDataTimeout) {
                    throw new DataSetReadException("read data timeout，jobName:" + getJobName());
                }
                try {
                    this.notEmpty.wait(200L);
                    i2 = this.writeSegmentIndex.get();
                } catch (InterruptedException e) {
                    throw new DataSetReadException("read interrupted", e);
                }
            }
        }
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DppDataSet
    public void close() {
        this.closed = true;
        if (!isFinished(this.jobResultRef.get().getJobStatus())) {
            DppGlobalScheduleExecutor.submitNow(new JobCanceler(this.jobResultRef.get().getJobHexId()));
        }
        ServiceRefCenter.getInstance().freeRef(this.refId);
        this.dataSegmentList.forEach(dataSegment -> {
            dataSegment.close();
        });
    }

    public long receiveData(SocketBatchSinkDatas socketBatchSinkDatas) throws DataSinkException {
        SocketBatchSinkDatas remove;
        if (this.closed) {
            return -1L;
        }
        synchronized (this) {
            long seq = socketBatchSinkDatas.getSeq();
            this.currentMaxReceivedSeq = Math.max(seq, this.currentMaxReceivedSeq);
            if (this.receivedBatchSeqSet.contains(Long.valueOf(seq))) {
                return this.nextBatchSeq;
            }
            this.receivedBatchSeqSet.add(Long.valueOf(seq));
            if (seq != this.nextBatchSeq) {
                this.unOrderSinkDatas.put(Long.valueOf(seq), socketBatchSinkDatas);
                return this.nextBatchSeq;
            }
            writeDataIntoSegment(socketBatchSinkDatas);
            this.nextBatchSeq++;
            while (this.nextBatchSeq <= this.currentMaxReceivedSeq && null != (remove = this.unOrderSinkDatas.remove(Long.valueOf(this.nextBatchSeq)))) {
                writeDataIntoSegment(remove);
                this.nextBatchSeq++;
            }
            return this.nextBatchSeq;
        }
    }

    protected void writeDataIntoSegment(SocketBatchSinkDatas socketBatchSinkDatas) throws DataSinkException {
        List<Object[]> rowDatas = socketBatchSinkDatas.getRowDatas();
        if (rowDatas.isEmpty()) {
            return;
        }
        long addAndGet = this.receivedTotalRowSize.addAndGet(rowDatas.size());
        int i = this.writeSegmentIndex.get();
        if (i == -1) {
            LocalMemoryDataSegment localMemoryDataSegment = new LocalMemoryDataSegment(getJobName(), getRowMeta());
            localMemoryDataSegment.setSegmentIndex(i + RESET_OPERATION);
            localMemoryDataSegment.writeRows(rowDatas);
            this.dataSegmentList.add(localMemoryDataSegment);
            this.writeSegmentIndex.incrementAndGet();
            notifyDataReady();
            return;
        }
        DataSegment dataSegment = this.dataSegmentList.get(i);
        if (!dataSegment.isFull()) {
            dataSegment.writeRows(rowDatas);
            return;
        }
        dataSegment.setWriteFinished(true);
        if (addAndGet - this.readRowIndex.get() <= MAX_DATA_RECEIVE_QUEUE_SIZE) {
            LocalMemoryDataSegment localMemoryDataSegment2 = new LocalMemoryDataSegment(getJobName(), getRowMeta());
            localMemoryDataSegment2.setSegmentIndex(i + RESET_OPERATION);
            localMemoryDataSegment2.writeRows(rowDatas);
            this.dataSegmentList.add(localMemoryDataSegment2);
            this.writeSegmentIndex.incrementAndGet();
            notifyDataReady();
            return;
        }
        FileDataSegment fileDataSegment = new FileDataSegment(getJobName(), getRowMeta());
        fileDataSegment.setSegmentIndex(i + RESET_OPERATION);
        fileDataSegment.writeRows(rowDatas);
        this.dataSegmentList.add(fileDataSegment);
        this.writeSegmentIndex.incrementAndGet();
        notifyDataReady();
    }

    public List<Long> checkSeqReceiveState(Set<Long> set) {
        ArrayList arrayList = new ArrayList(set.size());
        set.forEach(l -> {
            if (this.receivedBatchSeqSet.contains(l)) {
                return;
            }
            arrayList.add(l);
        });
        Collections.sort(arrayList);
        return arrayList;
    }

    public String getRefId() {
        return this.refId;
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DppDataSet
    public boolean reset() {
        if (this.readRowIndex.get() > 0 || this.closed || !this.operator.compareAndSet(-1, RESET_OPERATION)) {
            return false;
        }
        try {
            this.receivedTotalRowSize.set(0L);
            this.readSegmentIndex.set(-1);
            this.writeSegmentIndex.set(-1);
            this.receivedBatchSeqSet.clear();
            this.nextBatchSeq = 1L;
            this.currentMaxReceivedSeq = -1L;
            this.unOrderSinkDatas.clear();
            Iterator it = new ArrayList(this.dataSegmentList).iterator();
            while (it.hasNext()) {
                ((DataSegment) it.next()).close();
            }
            this.readRowIndex.set(0L);
            this.operator.set(-1);
            return true;
        } catch (Throwable th) {
            this.operator.set(-1);
            throw th;
        }
    }
}
