package kd.bos.ais.core;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;

/* loaded from: input_file:kd/bos/ais/core/BatchProcessor.class */
public class BatchProcessor<T> {
    private static final Log log = LogFactory.getLog(BatchProcessor.class);
    private final int bufferSize;
    private final int batchSize;
    private RingBuffer<Element<T>> ringBuffer;
    private final Consumer<List<T>> consumer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/ais/core/BatchProcessor$Element.class */
    public static class Element<T> {
        private T value;

        private Element() {
        }

        public T get() {
            return this.value;
        }

        public void set(T t) {
            this.value = t;
        }
    }

    public BatchProcessor(int i, ProducerType producerType, WaitStrategy waitStrategy, ThreadFactory threadFactory, Consumer<List<T>> consumer, int i2) {
        this.consumer = consumer;
        this.bufferSize = i;
        this.batchSize = i2;
        init(producerType, waitStrategy, threadFactory);
    }

    private void init(ProducerType producerType, WaitStrategy waitStrategy, ThreadFactory threadFactory) {
        Disruptor disruptor = new Disruptor(getEventFactory(), this.bufferSize, threadFactory, producerType, waitStrategy);
        disruptor.handleEventsWith(new EventHandler[]{getEventHandler()});
        disruptor.start();
        this.ringBuffer = disruptor.getRingBuffer();
    }

    public void put(T t) {
        long next = this.ringBuffer.next();
        try {
            ((Element) this.ringBuffer.get(next)).set(t);
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    public void put(List<T> list) {
        int size = list.size();
        long next = this.ringBuffer.next(size);
        long j = next - (size - 1);
        for (int i = 0; i < size; i++) {
            try {
                ((Element) this.ringBuffer.get(j + i)).set(list.get(i));
            } finally {
                this.ringBuffer.publish(j, next);
            }
        }
    }

    private EventFactory<Element<T>> getEventFactory() {
        return () -> {
            return new Element();
        };
    }

    private EventHandler<Element<T>> getEventHandler() {
        return new EventHandler<Element<T>>() { // from class: kd.bos.ais.core.BatchProcessor.1
            private List<T> cache;

            {
                this.cache = new ArrayList(BatchProcessor.this.batchSize);
            }

            public void onEvent(Element<T> element, long j, boolean z) {
                process(element, z);
            }

            private void process(Element<T> element, boolean z) {
                try {
                    this.cache.add(element.get());
                    if (z || this.cache.size() >= BatchProcessor.this.batchSize) {
                        BatchProcessor.log.info(String.format("start batch consume, this batch size is %s. endOfBatch: %s", Integer.valueOf(this.cache.size()), Boolean.valueOf(z)));
                        BatchProcessor.this.consumer.accept(this.cache);
                        this.cache.clear();
                        BatchProcessor.log.info("batch end consume.");
                    }
                } catch (Exception e) {
                    BatchProcessor.log.warn("consume error: " + e.getMessage(), e);
                }
            }
        };
    }
}
