package com.kingdee.bos.qing.dpp.client.dataset;

import com.kingdee.bos.qing.common.rpc.codec.compression.CompressionFactory;
import com.kingdee.bos.qing.common.rpc.codec.compression.CompressionType;
import com.kingdee.bos.qing.common.rpc.codec.serialization.SerializationFactory;
import com.kingdee.bos.qing.common.rpc.codec.serialization.SerializationType;
import com.kingdee.bos.qing.common.rpc.codec.serialization.inbound.ObjectInput;
import com.kingdee.bos.qing.common.rpc.codec.serialization.outbound.ObjectOutput;
import com.kingdee.bos.qing.dpp.client.common.file.SegmentFileAutoCloser;
import com.kingdee.bos.qing.dpp.client.exception.DataSetReadException;
import com.kingdee.bos.qing.dpp.client.job.JobRuntimeCache;
import com.kingdee.bos.qing.dpp.common.log.DppLogger;
import com.kingdee.bos.qing.dpp.common.options.QDppOptions;
import com.kingdee.bos.qing.dpp.job.exception.DataSinkException;
import com.kingdee.bos.qing.dpp.model.file.BinaryFileSegmentRecord;
import com.kingdee.bos.qing.dpp.model.file.OneFileSegmentInfo;
import com.kingdee.bos.qing.dpp.model.schema.DppField;
import com.kingdee.bos.qing.dpp.utils.DataConvertUtil;
import com.kingdee.bos.qing.filesystem.manager.AbstractQingFile;
import com.kingdee.bos.qing.filesystem.manager.FileFactory;
import com.kingdee.bos.qing.filesystem.manager.api.IQingFileWriter;
import com.kingdee.bos.qing.filesystem.manager.model.QingTempFileType;
import com.kingdee.bos.qing.filesystem.stream.QingInputStream;
import com.kingdee.bos.qing.filesystem.stream.QingOutputStream;
import com.kingdee.bos.qing.util.CloseUtil;
import com.kingdee.bos.qing.util.LogUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/client/dataset/FileDataSegment.class */
public class FileDataSegment extends DataSegment {
    private static final Logger log = new DppLogger("Qing-Dpp:", LoggerFactory.getLogger(FileDataSegment.class));
    private QingOutputStream qingOutputStream;
    private AbstractQingFile qingFile;
    private IQingFileWriter fileWriter;
    private Object fileReady;
    private boolean fileFinished;
    private int writeRowCount;
    private QingInputStream qingInputStream;
    private int maxRowCount;
    private LinkedList<DppRowData> batchRowDatas;
    private boolean readFinished;
    private OneFileSegmentInfo segmentInfo;
    private boolean deleteOnClose;
    private AtomicBoolean closed;
    private long recentRWTime;

    public FileDataSegment(String str, List<DppField> list) {
        super(str, list);
        this.qingOutputStream = null;
        this.fileReady = new Object();
        this.fileFinished = false;
        this.writeRowCount = 0;
        this.batchRowDatas = new LinkedList<>();
        this.readFinished = false;
        this.segmentInfo = new OneFileSegmentInfo();
        this.deleteOnClose = true;
        this.closed = new AtomicBoolean(false);
        this.recentRWTime = System.currentTimeMillis();
        this.maxRowCount = 2000;
    }

    public FileDataSegment(String str, List<DppField> list, long j) {
        super(str, list);
        this.qingOutputStream = null;
        this.fileReady = new Object();
        this.fileFinished = false;
        this.writeRowCount = 0;
        this.batchRowDatas = new LinkedList<>();
        this.readFinished = false;
        this.segmentInfo = new OneFileSegmentInfo();
        this.deleteOnClose = true;
        this.closed = new AtomicBoolean(false);
        this.recentRWTime = System.currentTimeMillis();
        this.segmentInfo.setJobName(str);
        this.segmentInfo.setStartRowIndex(j);
        this.deleteOnClose = false;
        this.maxRowCount = ((Integer) QDppOptions.DATASET_BINARY_FILE_SEGMENT_MAX_ROW_SIZE.getValue()).intValue();
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public DppRowData readRow() throws DataSetReadException {
        waitFileWriteFinish();
        if (this.fileFinished) {
            return readDataFromFile();
        }
        return null;
    }

    private void waitFileWriteFinish() {
        synchronized (this.fileReady) {
            if (!this.fileFinished) {
                try {
                    this.fileReady.wait(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private DppRowData readDataFromFile() throws DataSetReadException {
        touch();
        initInputStreamAtFirst();
        if (this.batchRowDatas.size() > 0) {
            return this.batchRowDatas.removeFirst();
        }
        ObjectInput objectInput = null;
        try {
            try {
                byte[] bArr = new byte[4];
                int read = this.qingInputStream.read(bArr);
                if (read != 4) {
                    if (read == -1) {
                        this.readFinished = true;
                    }
                }
                int bytesToInt = DataConvertUtil.bytesToInt(bArr, 0);
                byte[] bArr2 = new byte[bytesToInt];
                int read2 = this.qingInputStream.read(bArr2);
                if (read2 == -1 || read2 != bytesToInt) {
                    throw new DataSetReadException("read row content failed,file data segment is damaged");
                }
                ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bArr2);
                ObjectInput deserializeInput = SerializationFactory.createSerialization(SerializationType.KRYO.getType()).getDeserializeInput(CompressionFactory.getCompression(CompressionType.LZ4.getType()).createInput(new ByteBufInputStream(wrappedBuffer)));
                List<DppRowData> rowDataList = ((BatchRowDatas) deserializeInput.readObject(BatchRowDatas.class)).getRowDataList();
                wrappedBuffer.release();
                if (null == rowDataList || rowDataList.size() <= 0) {
                    if (null != deserializeInput) {
                        deserializeInput.close();
                    }
                    return null;
                }
                for (DppRowData dppRowData : rowDataList) {
                    dppRowData.setRowMeta(this.rowMeta);
                    this.batchRowDatas.add(dppRowData);
                }
                DppRowData removeFirst = this.batchRowDatas.removeFirst();
                if (null != deserializeInput) {
                    deserializeInput.close();
                }
                return removeFirst;
            } catch (Exception e) {
                throw new DataSetReadException("read failed", e);
            }
        } finally {
            if (false) {
                objectInput.close();
            }
        }
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    protected int getMaxCapacity() {
        return this.maxRowCount;
    }

    private void initInputStreamAtFirst() throws DataSetReadException {
        if (null == this.qingInputStream) {
            if (!this.qingFile.exists()) {
                throw new DataSetReadException("fragment file not exist,jobName:" + getJobName());
            }
            try {
                this.qingInputStream = this.qingFile.getInputStream();
            } catch (IOException e) {
                throw new DataSetReadException("get input stream failed,jobName:" + getJobName(), e);
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    protected void internalWrite(List<Object[]> list) throws DataSinkException {
        if (this.closed.get()) {
            return;
        }
        touch();
        if (null == this.qingOutputStream) {
            this.qingFile = FileFactory.newTempFile(QingTempFileType.DS_CACHE);
            this.fileWriter = this.qingFile.createWriter();
            this.segmentInfo.setFileName(this.qingFile.getName());
            try {
                try {
                    this.qingOutputStream = this.fileWriter.getOutputStream();
                    SegmentFileAutoCloser.getInstance().register(this);
                    if (this.segmentInfo.isFailed()) {
                        this.qingFile.delete();
                    }
                } catch (IOException e) {
                    this.segmentInfo.setStatus(1);
                    doFinishSegment();
                    throw new DataSinkException("create output stream failed", e);
                }
            } catch (Throwable th) {
                if (this.segmentInfo.isFailed()) {
                    this.qingFile.delete();
                }
                throw th;
            }
        }
        ArrayList arrayList = new ArrayList(list.size());
        list.forEach(objArr -> {
            arrayList.add(new DppRowData(objArr, null));
        });
        BatchRowDatas batchRowDatas = new BatchRowDatas();
        batchRowDatas.setRowDataList(arrayList);
        ByteBuf buffer = Unpooled.buffer(10240);
        OutputStream outputStream = null;
        ObjectOutput objectOutput = null;
        try {
            try {
                outputStream = CompressionFactory.getCompression(CompressionType.LZ4.getType()).createOutput(new ByteBufOutputStream(buffer));
                objectOutput = SerializationFactory.createSerialization(SerializationType.KRYO.getType()).getSerializeOutput(outputStream);
                objectOutput.writeObject(batchRowDatas);
                objectOutput.flush();
                byte[] bArr = new byte[buffer.readableBytes()];
                System.arraycopy(buffer.array(), 0, bArr, 0, bArr.length);
                this.qingOutputStream.write(DataConvertUtil.intToBytes(bArr.length));
                this.qingOutputStream.write(bArr);
                this.qingOutputStream.flush();
                this.writeRowCount += arrayList.size();
                if (null != objectOutput) {
                    objectOutput.close();
                }
                buffer.release();
                CloseUtil.close(new Closeable[]{outputStream});
            } catch (Throwable th2) {
                if (null != objectOutput) {
                    objectOutput.close();
                }
                buffer.release();
                CloseUtil.close(new Closeable[]{outputStream});
                throw th2;
            }
        } catch (Exception e2) {
            this.segmentInfo.setStatus(1);
            close();
            doFinishSegment();
            throw new DataSinkException("encode row data failed", e2);
        }
    }

    private void touch() {
        this.recentRWTime = System.currentTimeMillis();
    }

    private void doFinishSegment() throws DataSinkException {
        BinaryFileSegmentRecord binaryFileSegmentRecord;
        String jobName = this.segmentInfo.getJobName();
        if (this.segmentInfo.getStatus() == 1 && null != (binaryFileSegmentRecord = JobRuntimeCache.getBinaryFileSegmentRecord(jobName))) {
            binaryFileSegmentRecord.setStatus(2);
            JobRuntimeCache.cacheBinaryFileSegmentRecord(jobName, binaryFileSegmentRecord);
        }
        if (this.segmentInfo.getSegmentIndex() == 0) {
            BinaryFileSegmentRecord binaryFileSegmentRecord2 = new BinaryFileSegmentRecord();
            updateSegmentRecord(this.segmentInfo, jobName, binaryFileSegmentRecord2);
            JobRuntimeCache.cacheBinaryFileSegmentRecord(jobName, binaryFileSegmentRecord2);
        } else {
            BinaryFileSegmentRecord binaryFileSegmentRecord3 = JobRuntimeCache.getBinaryFileSegmentRecord(jobName);
            if (null != binaryFileSegmentRecord3) {
                updateSegmentRecord(this.segmentInfo, jobName, binaryFileSegmentRecord3);
                JobRuntimeCache.cacheBinaryFileSegmentRecord(jobName, binaryFileSegmentRecord3);
            }
        }
    }

    private void updateSegmentRecord(OneFileSegmentInfo oneFileSegmentInfo, String str, BinaryFileSegmentRecord binaryFileSegmentRecord) throws DataSinkException {
        try {
            binaryFileSegmentRecord.addNewFileSegmentInfo(oneFileSegmentInfo);
            binaryFileSegmentRecord.setStatus(oneFileSegmentInfo.isLast() ? 1 : 0);
            binaryFileSegmentRecord.setJobName(str);
        } catch (DataSinkException e) {
            binaryFileSegmentRecord.setStatus(2);
            throw e;
        }
    }

    public boolean isNeedAutoClose() {
        return !this.closed.get() && System.currentTimeMillis() - this.recentRWTime >= ((Long) QDppOptions.DATASET_BINARY_FILE_SEGMENT_AUTOCLOSE_TIME.getValue()).longValue();
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public void setSegmentIndex(int i) {
        super.setSegmentIndex(i);
        this.segmentInfo.setSegmentIndex(i);
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public void setLastSegment(boolean z) {
        super.setLastSegment(z);
        if (z) {
            this.segmentInfo.setLast(true);
        }
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public void setWriteFinished(boolean z) throws DataSinkException {
        if (this.writeFinished.compareAndSet(false, true)) {
            try {
                if (null != this.qingOutputStream) {
                    this.qingOutputStream.close();
                    this.fileWriter.close((Exception) null);
                }
            } catch (IOException e) {
                log.error("close file segment output stream error", e);
            }
            this.fileFinished = true;
            synchronized (this.fileReady) {
                this.fileReady.notifyAll();
            }
            this.segmentInfo.setStatus(0);
            this.segmentInfo.setTotalWriteCount(this.writeRowCount);
            doFinishSegment();
        }
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public boolean isEmpty() {
        return this.readFinished;
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    protected int getNotReadSize() {
        return this.writeRowCount;
    }

    @Override // com.kingdee.bos.qing.dpp.client.dataset.DataSegment
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            try {
                setWriteFinished(true);
            } catch (DataSinkException e) {
                LogUtil.error("set write finish error when closing file segment data ", e);
            }
            CloseUtil.close(new Closeable[]{this.qingInputStream});
            if (this.deleteOnClose && null != this.qingFile && this.qingFile.exists()) {
                this.qingFile.delete();
            }
            SegmentFileAutoCloser.getInstance().unRegister(this.segmentInfo.getFileName());
        }
    }

    public OneFileSegmentInfo getSegmentInfo() {
        return this.segmentInfo;
    }

    public void closeForRead() {
        CloseUtil.close(new Closeable[]{this.qingInputStream});
    }
}
