package kd.bos.fake.mq.rabbitmqfake.queue;

import kd.bos.fake.mq.rabbitmqfake.Consume;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/bos/fake/mq/rabbitmqfake/queue/QueueFake.class */
public class QueueFake {
    private LimitQueue<Object> queue = new LimitQueue<>(2000);
    private static ThreadPool pools = ThreadPools.newFixedThreadPool("QueueFake-Counsume", Integer.getInteger("queuefake.thread.pool.max", 8).intValue());
    private Consume consumer;

    public void enqueue(Object obj) {
        if (this.consumer == null) {
            this.queue.offer(obj);
            return;
        }
        synchronized (pools) {
            pools.executeIncludeRequestContext(() -> {
                this.consumer.consume(obj);
            });
        }
    }

    public Consume getConsumer() {
        return this.consumer;
    }

    public void setConsumer(Consume consume) {
        this.consumer = consume;
        synchronized (pools) {
            while (this.queue.size() > 0) {
                Object poll = this.queue.poll();
                if (poll != null) {
                    pools.executeIncludeRequestContext(() -> {
                        consume.consume(poll);
                    });
                }
            }
        }
    }
}
