package kd.bos.algox.flink.enhance.krpc.impl;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import kd.bos.algox.flink.enhance.krpc.Actor;
import kd.bos.algox.flink.enhance.krpc.MailBox;
import kd.bos.algox.flink.enhance.krpc.MsgPlus;

/* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/MailBoxImpl.class */
public class MailBoxImpl implements MailBox {
    private final LinkedBlockingQueue<MsgPlus> box = new LinkedBlockingQueue<>();
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Actor actor;

    public MailBoxImpl(Actor actor) {
        this.actor = actor;
    }

    @Override // kd.bos.algox.flink.enhance.krpc.MailBox
    public boolean isInProcess() {
        return this.isRunning.get();
    }

    @Override // kd.bos.algox.flink.enhance.krpc.MailBox
    public void postMessage(MsgPlus msgPlus) {
        this.box.offer(msgPlus);
    }

    @Override // kd.bos.algox.flink.enhance.krpc.MailBox
    public void process() {
        if (this.isRunning.compareAndSet(false, true)) {
            try {
                MsgPlus poll = this.box.poll();
                while (poll != null) {
                    try {
                        this.actor.handleMsgPlus(poll);
                    } catch (Exception e) {
                        DispatcherImpl.log.error("Uncheck exception: " + e.getMessage(), e);
                        poll.responseException(e);
                    }
                    poll = this.box.poll();
                }
            } finally {
                this.isRunning.compareAndSet(true, false);
            }
        }
    }

    @Override // kd.bos.algox.flink.enhance.krpc.MailBox
    public void shutdown() {
    }
}
