package com.kingdee.bos.qing.dpp.client.dataset;

import com.kingdee.bos.qing.common.rpc.codec.compression.CompressionFactory;
import com.kingdee.bos.qing.common.rpc.codec.compression.CompressionType;
import com.kingdee.bos.qing.common.rpc.codec.serialization.SerializationFactory;
import com.kingdee.bos.qing.common.rpc.codec.serialization.SerializationType;
import com.kingdee.bos.qing.common.rpc.codec.serialization.inbound.ObjectInput;
import com.kingdee.bos.qing.common.rpc.codec.serialization.outbound.ObjectOutput;
import com.kingdee.bos.qing.common.session.IGlobalQingSession;
import com.kingdee.bos.qing.common.session.QingSessionUtil;
import com.kingdee.bos.qing.dpp.client.exception.DataSetReadException;
import com.kingdee.bos.qing.dpp.common.log.DppLogger;
import com.kingdee.bos.qing.dpp.job.exception.DataSinkException;
import com.kingdee.bos.qing.dpp.model.schema.DppField;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/client/dataset/GlobalDataSegment.class */
public class GlobalDataSegment extends DataSegment {
    private static final Logger logger = new DppLogger("Qing-Dpp:", LoggerFactory.getLogger(GlobalDataSegment.class));
    private int batchDataWriteIndex;
    private int totalWriteSize;
    private int readRowIndex;
    private LinkedBlockingQueue<Integer> batchDataIndexes;
    private LinkedList<DppRowData> batchRowDatas;
    private IGlobalQingSession globalQingSession;

    public GlobalDataSegment(String str, List<DppField> list) {
        super(str, list);
        this.batchDataWriteIndex = 0;
        this.totalWriteSize = 0;
        this.readRowIndex = 0;
        this.batchDataIndexes = new LinkedBlockingQueue<>();
        this.batchRowDatas = new LinkedList<>();
        this.globalQingSession = QingSessionUtil.getGlobalQingSessionImpl();
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public DppRowData readRow() throws DataSetReadException {
        if (this.batchRowDatas.size() > 0) {
            this.readRowIndex++;
            return this.batchRowDatas.removeFirst();
        }
        ObjectInput objectInput = null;
        try {
            try {
                Integer poll = this.batchDataIndexes.poll(1000L, TimeUnit.MILLISECONDS);
                if (null == poll) {
                    return null;
                }
                byte[] byteData = this.globalQingSession.getByteData(createBatchRowKey(poll.intValue()));
                if (null == byteData || byteData.length == 0) {
                    throw new DataSetReadException("batch row data byte size is zero");
                }
                ObjectInput deserializeInput = SerializationFactory.createSerialization(SerializationType.KRYO.getType()).getDeserializeInput(CompressionFactory.getCompression(CompressionType.SNAPPY.getType()).createInput(new ByteBufInputStream(Unpooled.wrappedBuffer(byteData))));
                List<DppRowData> rowDataList = ((BatchRowDatas) deserializeInput.readObject(BatchRowDatas.class)).getRowDataList();
                if (null == rowDataList || rowDataList.isEmpty()) {
                    throw new DataSetReadException("batch row data is empty");
                }
                this.batchRowDatas.addAll(rowDataList);
                this.readRowIndex++;
                DppRowData removeFirst = this.batchRowDatas.removeFirst();
                if (null != deserializeInput) {
                    deserializeInput.close();
                }
                return removeFirst;
            } catch (Exception e) {
                throw new DataSetReadException("data read error", e);
            }
        } finally {
            if (false) {
                objectInput.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    protected void internalWrite(List<Object[]> list) throws DataSinkException {
        ArrayList arrayList = new ArrayList(list.size());
        list.forEach(objArr -> {
            arrayList.add(new DppRowData(objArr, this.rowMeta));
        });
        BatchRowDatas batchRowDatas = new BatchRowDatas();
        batchRowDatas.setRowDataList(arrayList);
        ByteBuf buffer = Unpooled.buffer(10240);
        ObjectOutput objectOutput = null;
        try {
            try {
                objectOutput = SerializationFactory.createSerialization(SerializationType.KRYO.getType()).getSerializeOutput(CompressionFactory.getCompression(CompressionType.SNAPPY.getType()).createOutput(new ByteBufOutputStream(buffer)));
                objectOutput.writeObject(batchRowDatas);
                objectOutput.flush();
                byte[] bArr = new byte[buffer.readableBytes()];
                System.arraycopy(buffer.array(), 0, bArr, 0, bArr.length);
                this.globalQingSession.setBytesData(createBatchRowKey(this.batchDataWriteIndex + 1), bArr, 600);
                this.batchDataWriteIndex++;
                if (!this.batchDataIndexes.offer(Integer.valueOf(this.batchDataWriteIndex))) {
                    logger.warn("batchDataIndexes insert failed");
                }
                this.totalWriteSize += arrayList.size();
                if (null != objectOutput) {
                    objectOutput.close();
                }
            } catch (Exception e) {
                throw new DataSinkException("encode row data failed", e);
            }
        } catch (Throwable th) {
            if (null != objectOutput) {
                objectOutput.close();
            }
            throw th;
        }
    }

    private String createBatchRowKey(int i) {
        return getJobName() + "_" + getSegmentIndex() + "_" + i;
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public boolean isEmpty() {
        return this.batchDataIndexes.isEmpty() && this.batchRowDatas.isEmpty();
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    protected int getMaxCapacity() {
        return 1000;
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    protected int getNotReadSize() {
        return this.totalWriteSize - this.readRowIndex;
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public void close() {
        for (int i = 0; i < this.batchDataWriteIndex; i++) {
            this.globalQingSession.remove(createBatchRowKey(i));
        }
    }
}
