package kd.bos.dts.consume.impl.storageQueue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import kd.bos.algo.util.OutUtil;
import kd.bos.dts.consume.CAccountInfo;
import kd.bos.dts.exception.ExceptionLogger;

/* loaded from: input_file:kd/bos/dts/consume/impl/storageQueue/StorageQueue.class */
public class StorageQueue<E> {
    private QueueMeta queueMeta;
    private final DtsDataConsumerAsyncTask<E> task;
    private LinkedBlockingQueue<QueueItem<E>> queue = new LinkedBlockingQueue<>(2000);
    private static int maxPatch = 5000;
    private static volatile AtomicLong _id = new AtomicLong(0);

    public StorageQueue(CAccountInfo cAccountInfo) {
        this.task = new DtsDataConsumerAsyncTask<>(cAccountInfo, this);
        this.queueMeta = StorageQueueStaus.getQueueMeta(cAccountInfo);
    }

    public void enqueue(E e) {
        try {
            QueueItem<E> queueItem = new QueueItem<>(e);
            QueueConsumeExecutor.addTask(this.task);
            queueItem.setMessageId(_id.incrementAndGet());
            this.queue.put(queueItem);
        } catch (InterruptedException e2) {
            ExceptionLogger.error(StorageQueue.class, "enqueue error", e2);
        }
        this.queueMeta.setEndIndex(0L);
    }

    public List<QueueItem<E>> dequeue() {
        ArrayList arrayList = new ArrayList(8);
        if (this.queue.size() > 0) {
            this.queue.drainTo(arrayList, maxPatch);
        }
        return arrayList;
    }

    public void ack(long j) {
        OutUtil.getSystemOut().println("ack message " + _id);
    }
}
