package kd.bos.xdb.transport.exchanger;

import java.util.ArrayList;
import java.util.List;
import kd.bos.xdb.XDBManagerConstant;
import kd.bos.xdb.transport.channel.Channel;
import kd.bos.xdb.transport.record.PausedRecord;
import kd.bos.xdb.transport.record.Record;
import kd.bos.xdb.transport.record.TerminateRecord;

/* loaded from: input_file:kd/bos/xdb/transport/exchanger/BufferdRecordExchanger.class */
public class BufferdRecordExchanger implements RecordSender, RecordReceiver {
    private final Channel channel;
    private int bufferIndex = 0;
    private volatile boolean shutDown = false;
    private int bufferSize = XDBManagerConstant.BUFFER_SIZE;
    private final List<Record> buffer = new ArrayList(this.bufferSize);

    public BufferdRecordExchanger(Channel channel) {
        this.channel = channel;
    }

    @Override // kd.bos.xdb.transport.exchanger.RecordReceiver
    public Record getFromReader() {
        if (this.shutDown) {
        }
        if (this.bufferIndex >= this.buffer.size()) {
            receive();
        }
        Record record = this.buffer.get(this.bufferIndex);
        this.bufferIndex++;
        if (record instanceof TerminateRecord) {
            return null;
        }
        return record;
    }

    @Override // kd.bos.xdb.transport.exchanger.RecordSender
    public void sendToWriter(Record record) {
        if (this.shutDown) {
        }
        if (this.bufferIndex >= this.bufferSize) {
            flush();
        }
        this.buffer.add(record);
        this.bufferIndex++;
    }

    @Override // kd.bos.xdb.transport.exchanger.RecordSender
    public void terminate() {
        if (this.shutDown) {
        }
        flush();
        this.channel.pushTerminate(TerminateRecord.get());
    }

    @Override // kd.bos.xdb.transport.exchanger.RecordSender
    public void pause() {
        if (this.shutDown) {
        }
        flush();
        this.channel.pushPaused(PausedRecord.get());
        this.channel.pushTerminate(TerminateRecord.get());
    }

    @Override // kd.bos.xdb.transport.exchanger.RecordSender
    public void flush() {
        this.channel.pushAll(this.buffer);
        this.bufferIndex = 0;
        this.buffer.clear();
    }

    @Override // kd.bos.xdb.transport.exchanger.RecordReceiver
    public void receive() {
        this.channel.pullAll(this.buffer);
        this.bufferIndex = 0;
        this.bufferSize = this.buffer.size();
    }

    @Override // kd.bos.xdb.transport.exchanger.RecordSender, kd.bos.xdb.transport.exchanger.RecordReceiver
    public void shutDown() {
        this.shutDown = true;
        this.buffer.clear();
        this.channel.clear();
    }
}
