package kd.bos.mq.support;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import kd.bos.context.RequestContext;
import kd.bos.context.RequestContextCreator;
import kd.bos.db.DBRoute;
import kd.bos.instance.Instance;
import kd.bos.metric.Counter;
import kd.bos.metric.MetricSystem;
import kd.bos.mq.MessageConsumer;
import kd.bos.mq.delay.DelayControlManager;
import kd.bos.mq.delay.MetaTime;
import kd.bos.mq.dlx.DLXConfig;
import kd.bos.mq.dlx.DLXMesPubFactory;
import kd.bos.mq.dlx.DLXStrategy;
import kd.bos.mq.dlx.MessageRecord;
import kd.bos.mq.rabbit.ExceptionLogger;
import kd.bos.mq.rocket.ProducerFactory;
import kd.bos.mq.rocket.RocketAcker;
import kd.bos.mq.stat.ConsumerStats;
import kd.bos.rocketmq.RocketmqFactory;
import kd.bos.thread.SetThreadName;
import kd.bos.thread.ThreadLifeCycleManager;
import kd.bos.trace.TraceSpan;
import kd.bos.trace.Tracer;
import kd.bos.trace.util.TraceIdUtil;
import kd.bos.util.StringUtils;
import org.apache.rocketmq.client.producer.MQProducer;

/* loaded from: input_file:kd/bos/mq/support/ConsumerManager.class */
public class ConsumerManager {
    private static final Counter noRequestContextCounter = MetricSystem.counter("kd.metrics.mq.consumer.noRequestContextCounter");
    private static final Counter decodeMessageErrorCounter = MetricSystem.counter("kd.metrics.mq.consumer.decodeMessageErrorCounter");
    private static final Counter handlerMessagerErrorCounter = MetricSystem.counter("kd.metrics.mq.consumer.handlerOnMessagerErrorCounter");
    private static final Counter handlerMessagerTranscationRollBackCounter = MetricSystem.counter("kd.metrics.mq.consumer.handlerMessagerTranscationRollBackCounter");
    private static final Counter handlerMessagerTranscationRepeateCounter = MetricSystem.counter("kd.metrics.mq.consumer.handlerMessagerTranscationRepeateCounter");
    private static final Counter handlerMessagerRepeateCounter = MetricSystem.counter("kd.metrics.mq.consumer.handlerMessagerRepeateCounter");
    private static final Counter consumerounter = MetricSystem.counter("kd.metrics.mq.consumer.Counter");
    private static final Counter activeCounter = MetricSystem.counter("kd.metrics.mq.consumer.activeConsumers");
    private static TimeCacheMap<AtomicInteger> messageRepeatTimes = new TimeCacheMap<>(300);

    public static Counter getConsumerounter() {
        return consumerounter;
    }

    public static void innerHandleDelivery(MessageCommonAcker messageCommonAcker, MessageConsumer messageConsumer, String str, String str2, String str3, int i, byte[] bArr, String str4) {
        boolean z = i > 0;
        try {
            Message message = toMessage(bArr);
            if (renewDelay(str3, message, messageCommonAcker, str, str4)) {
                return;
            }
            if (StringUtils.isNotEmpty(message.getKdtxId())) {
                _handleDtxDelivery(messageConsumer, z, str, str2, str3, message, messageCommonAcker);
                return;
            }
            long innerId = message.getInnerId();
            if (innerId != 0) {
                AtomicInteger atomicInteger = messageRepeatTimes.get(Long.valueOf(innerId));
                if (atomicInteger == null) {
                    atomicInteger = new AtomicInteger(0);
                    messageRepeatTimes.put(Long.valueOf(innerId), atomicInteger);
                }
                if (atomicInteger.getAndIncrement() > 100) {
                    messageCommonAcker.discard(str3);
                    messageRepeatTimes.remove(Long.valueOf(innerId));
                    handlerMessagerRepeateCounter.inc();
                    ExceptionLogger.log("MQError:Repeat send message too many times, auto discard, messageId=" + str3 + ",queue=" + str2);
                    return;
                }
            }
            try {
                try {
                    ThreadLifeCycleManager.start();
                    _handleDelivery(messageConsumer, i, str, str2, str3, message, messageCommonAcker, str4);
                    ThreadLifeCycleManager.end();
                    if (messageCommonAcker.isDenied()) {
                        return;
                    }
                    messageRepeatTimes.remove(Long.valueOf(innerId));
                } catch (Exception e) {
                    if (messageCommonAcker.hasDone()) {
                        ExceptionLogger.log("MQError:handleDelivery uncatched exception, messageId=" + str3, e);
                    } else {
                        int tryTimes = TranscationSupport.instance().tryTimes(String.valueOf(message.getInnerId()));
                        if (tryTimes > 30) {
                            messageCommonAcker.discard(str3);
                            ExceptionLogger.log("MQError:handleDelivery uncatched exception,try times >30, auto discard, messageId=" + str3, e);
                        } else {
                            applyWait(tryTimes);
                            messageCommonAcker.deny(str3);
                            ExceptionLogger.log("MQError:handleDelivery uncatched exception , auto deny, messageId=" + str3, e);
                        }
                    }
                    ThreadLifeCycleManager.end();
                    if (messageCommonAcker.isDenied()) {
                        return;
                    }
                    messageRepeatTimes.remove(Long.valueOf(innerId));
                }
            } catch (Throwable th) {
                ThreadLifeCycleManager.end();
                if (!messageCommonAcker.isDenied()) {
                    messageRepeatTimes.remove(Long.valueOf(innerId));
                }
                throw th;
            }
        } catch (Exception e2) {
            messageCommonAcker.discard(str3);
            decodeMessageErrorCounter.inc();
            ExceptionLogger.log("MQError:toMessage exception, auto discard, queue=" + str2 + ",messageId=" + str3, e2);
        }
    }

    private static boolean renewDelay(String str, Message message, MessageCommonAcker messageCommonAcker, String str2, String str3) {
        int startDeliverTime = (int) ((message.getStartDeliverTime() - System.currentTimeMillis()) / 1000);
        if (message.getStartDeliverTime() <= 0 || startDeliverTime <= 0) {
            return false;
        }
        try {
            MetaTime selectMaxMetaTime = DelayControlManager.selectMaxMetaTime(startDeliverTime);
            byte[] encode = MessageSerde.get().encode(message);
            MQProducer producer = ProducerFactory.getProducer(str2, str3, RocketmqFactory.getRocketInfo(ProducerFactory.getRegionServerKey(str2)));
            org.apache.rocketmq.common.message.Message message2 = new org.apache.rocketmq.common.message.Message(str3, "*", encode);
            message2.setDelayTimeLevel(selectMaxMetaTime.getLevel());
            producer.send(message2);
            messageCommonAcker.discard(str);
            return true;
        } catch (Exception e) {
            messageCommonAcker.deny(str);
            ExceptionLogger.log("publishDelayMessageConfirmModel for queue " + str3 + " error ", e);
            return true;
        }
    }

    private static void _handleDtxDelivery(MessageConsumer messageConsumer, boolean z, String str, String str2, String str3, Message message, MessageCommonAcker messageCommonAcker) {
        boolean z2 = false;
        DBRoute of = DBRoute.of(message.getRouteKey());
        try {
            try {
                ThreadLifeCycleManager.start();
                if (!createTraceAndRequestContext(message)) {
                    messageCommonAcker.discard(str3);
                    KdtxSupport.received(of, message.getKdtxId(), message.getSeq());
                    ExceptionLogger.log("MQError:message has`t requestContext, auto discard, messageId=" + str3);
                    noRequestContextCounter.inc();
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                    if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                        if (0 != 0) {
                            KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                        }
                        KdtxSupport.endConsumer(of, message.getKdtxId());
                    }
                    ThreadLifeCycleManager.end();
                    return;
                }
                boolean isReceived = KdtxSupport.isReceived(of, message.getKdtxId(), message.getSeq());
                if (!isReceived) {
                    try {
                        KdtxSupport.received(of, message.getKdtxId(), message.getSeq());
                    } catch (Exception e) {
                        messageCommonAcker.deny(str3);
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "MQError:message update receive statue error, will requeue", true);
                        noRequestContextCounter.inc();
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                        if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                            if (0 != 0) {
                                KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                            }
                            KdtxSupport.endConsumer(of, message.getKdtxId());
                        }
                        ThreadLifeCycleManager.end();
                        return;
                    }
                } else if (!z) {
                    boolean isManual = KdtxSupport.isManual(message.getKdtxId(), message.getSeq());
                    z2 = isManual;
                    if (!isManual) {
                        messageCommonAcker.discard(str3);
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                        if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                            if (z2) {
                                KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                            }
                            KdtxSupport.endConsumer(of, message.getKdtxId());
                        }
                        ThreadLifeCycleManager.end();
                        return;
                    }
                } else if (KdtxSupport.mqSecondCompensate(message.getKdtxId(), message.getSeq())) {
                    messageCommonAcker.discard(str3);
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                    if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                        if (0 != 0) {
                            KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                        }
                        KdtxSupport.endConsumer(of, message.getKdtxId());
                    }
                    ThreadLifeCycleManager.end();
                    return;
                }
                if (message.getKdtxId().equals(message.getTranscationTag())) {
                    if (TranscationSupport.instance().isRollBack(message.getTranscationTag())) {
                        messageCommonAcker.isDiscarded();
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                        if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                            if (z2) {
                                KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                            }
                            KdtxSupport.endConsumer(of, message.getKdtxId());
                        }
                        ThreadLifeCycleManager.end();
                        return;
                    }
                    if (!TranscationSupport.instance().existXid(message.getRouteKey(), message.getTranscationTag())) {
                        messageCommonAcker.isDiscarded();
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                        if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 0 != 0)) {
                            if (z2) {
                                KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                            }
                            KdtxSupport.endConsumer(of, message.getKdtxId());
                        }
                        ThreadLifeCycleManager.end();
                        return;
                    }
                }
                KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), String.format("message[isReceived:%s,isManual:%s] start execute", Boolean.valueOf(isReceived), Boolean.valueOf(z2)), false);
                if (isforbiddenTenantCode(RequestContext.get().getTenantCode())) {
                    LockSupport.parkNanos(1000000000L);
                    messageCommonAcker.deny(str3);
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                    if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                        if (z2) {
                            KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                        }
                        KdtxSupport.endConsumer(of, message.getKdtxId());
                    }
                    ThreadLifeCycleManager.end();
                    return;
                }
                if (DLXConfig.isSendDLX(message.getRequestContext())) {
                    message.setDLXMessage(true);
                    DLXMesPubFactory.getDLXMessagePublisher().sendMessage(str, str2, message, str3, messageCommonAcker);
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                    if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                        if (z2) {
                            KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                        }
                        KdtxSupport.endConsumer(of, message.getKdtxId());
                    }
                    ThreadLifeCycleManager.end();
                    return;
                }
                ConsumerStats.incrementActiveCount(str2, str3);
                doTraceAndConsume(messageConsumer, z, str3, message, messageCommonAcker);
                ConsumerStats.decrementActiveCount(str2, str3);
                if (message.isDLXMessage() && DLXConfig.getDLXStrategy() == DLXStrategy.DEFAULT) {
                    MessageRecord.update(2, message.getInnerId());
                }
                KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                    if (z2) {
                        KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                    }
                    KdtxSupport.endConsumer(of, message.getKdtxId());
                }
                ThreadLifeCycleManager.end();
            } catch (Error | Exception e2) {
                if (!messageCommonAcker.hasDone()) {
                    applyWait(1);
                    messageCommonAcker.deny(str3);
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "common exception auto deny,errorMsg:" + e2.getMessage(), true);
                }
                ExceptionLogger.log("MQError:handleDelivery uncatched exception , auto deny, messageId=" + str3, e2);
                KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                    if (0 != 0) {
                        KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                    }
                    KdtxSupport.endConsumer(of, message.getKdtxId());
                }
                ThreadLifeCycleManager.end();
            }
        } catch (Throwable th) {
            KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
            if (messageCommonAcker.isAcked() || (messageCommonAcker.isDiscarded() && 1 != 0)) {
                if (0 != 0) {
                    KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                }
                KdtxSupport.endConsumer(of, message.getKdtxId());
            }
            ThreadLifeCycleManager.end();
            throw th;
        }
    }

    private static void _handleDelivery(MessageConsumer messageConsumer, int i, String str, String str2, String str3, Message message, MessageCommonAcker messageCommonAcker, String str4) {
        if (!createTraceAndRequestContext(message)) {
            messageCommonAcker.discard(str3);
            ExceptionLogger.log("MQError:message has`t requestContext, auto discard, messageId=" + str3);
            noRequestContextCounter.inc();
            return;
        }
        if (isforbiddenTenantCode(RequestContext.get().getTenantCode())) {
            LockSupport.parkNanos(1000000000L);
            messageCommonAcker.deny(str3);
            return;
        }
        if (DLXConfig.isSendDLX(message.getRequestContext())) {
            message.setDLXMessage(true);
            DLXMesPubFactory.getDLXMessagePublisher().sendMessage(str, str2, message, str3, messageCommonAcker);
        } else if (isTransactionSubmit(messageConsumer, str3, message, messageCommonAcker, i, str, str4)) {
            ConsumerStats.incrementActiveCount(str2, str3);
            doTraceAndConsume(messageConsumer, i > 0, str3, message, messageCommonAcker);
            ConsumerStats.decrementActiveCount(str2, str3);
            if (message.isDLXMessage() && DLXConfig.getDLXStrategy() == DLXStrategy.DEFAULT) {
                MessageRecord.update(2, message.getInnerId());
            }
        }
    }

    private static boolean isforbiddenTenantCode(String str) {
        String[] split;
        if (Instance.isPausedServiceByMonitor()) {
            return true;
        }
        String property = System.getProperty("mq.consume.forbidden.tenantcodes");
        if (property == null || (split = property.split(",|;")) == null) {
            return false;
        }
        for (String str2 : split) {
            if (str.equalsIgnoreCase(str2)) {
                return true;
            }
        }
        return false;
    }

    private static boolean isTransactionSubmit(MessageConsumer messageConsumer, String str, Message message, MessageCommonAcker messageCommonAcker, int i, String str2, String str3) {
        boolean existXid;
        String transcationTag = message.getTranscationTag();
        if (transcationTag == null) {
            return true;
        }
        int i2 = 0;
        do {
            existXid = TranscationSupport.instance().existXid(message.getRouteKey(), transcationTag);
            i2++;
            if (3 == i2) {
                break;
            }
            applyWait(i2);
        } while (!existXid);
        if (!existXid) {
            waitDeal(str, message, messageCommonAcker, transcationTag, i, str2, str3);
            return false;
        }
        String routeKey = message.getRouteKey();
        String routeKey2 = messageConsumer.getRouteKey();
        if (routeKey2 == null) {
            routeKey2 = routeKey;
        }
        if (TranscationSupport.instance().existConsumedId(routeKey2, transcationTag)) {
            messageCommonAcker.discard(str);
            handlerMessagerTranscationRepeateCounter.inc();
            return false;
        }
        String str4 = routeKey2;
        messageCommonAcker.setAckedCallBack(() -> {
            try {
                TranscationSupport.instance().insertConsumed(str4, transcationTag);
                TranscationSupport.instance().deleteXid(routeKey, transcationTag);
            } catch (Exception e) {
                ExceptionLogger.log("mq TranscationSupport error of acktion" + str, e);
            }
        });
        return true;
    }

    private static void waitDeal(String str, Message message, MessageCommonAcker messageCommonAcker, String str2, int i, String str3, String str4) {
        if (message.getRetryTimes() != -1) {
            delayStrategy(str, message, messageCommonAcker, i, str3, str4);
        } else {
            timesStrategy(str, message, messageCommonAcker, str2);
        }
    }

    private static void delayStrategy(String str, Message message, MessageCommonAcker messageCommonAcker, int i, String str2, String str3) {
        if (i >= 14) {
            messageCommonAcker.discard(str);
        } else {
            messageCommonAcker.deny(str);
        }
    }

    private static void timesStrategy(String str, Message message, MessageCommonAcker messageCommonAcker, String str2) {
        int tryTimes = TranscationSupport.instance().tryTimes(str2);
        if (isMessageTimeout(message)) {
            messageCommonAcker.discard(str);
            handlerMessagerTranscationRollBackCounter.inc();
        } else {
            applyWait(tryTimes);
            messageCommonAcker.deny(str);
        }
    }

    private static boolean isMessageTimeout(Message message) {
        return System.currentTimeMillis() - message.getMessageTime() > ((long) Integer.parseInt(System.getProperty("mq.trasncation.message.waittime", "300"))) * 1000;
    }

    private static void doTraceAndConsume(MessageConsumer messageConsumer, boolean z, String str, Message message, MessageCommonAcker messageCommonAcker) {
        TraceSpan create = Tracer.create("MQConsumer", "onMessage", true);
        Throwable th = null;
        try {
            create.addTag("messageId", str);
            create.addTag("resent", Boolean.toString(z));
            try {
                try {
                    activeCounter.inc();
                    messageConsumer.onMessage(message.getBody(), str, z, messageCommonAcker);
                    if (messageCommonAcker instanceof RocketAcker) {
                        ((RocketAcker) messageCommonAcker).setBizInvoked(true);
                    }
                    if (!messageCommonAcker.hasDone()) {
                        messageCommonAcker.ack(str);
                    }
                    activeCounter.dec();
                } catch (Exception e) {
                    if (messageCommonAcker.hasDone()) {
                        ExceptionLogger.log("MQError:onMessage uncatched exception, messageId=" + str, e);
                    } else {
                        messageCommonAcker.discard(str);
                        handlerMessagerErrorCounter.inc();
                        ExceptionLogger.log("MQError:onMessage uncatched exception, auto discard, messageId=" + str, e);
                    }
                    activeCounter.dec();
                }
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                activeCounter.dec();
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private static boolean createTraceAndRequestContext(Message message) {
        RequestContext requestContext = message.getRequestContext();
        if (requestContext == null) {
            return false;
        }
        String createTraceIdString = TraceIdUtil.createTraceIdString();
        SetThreadName.setTraceId(createTraceIdString);
        requestContext.setTraceId(createTraceIdString);
        RequestContextCreator.restoreForMQ(requestContext);
        return true;
    }

    private static void applyWait(int i) {
        if (i <= 1) {
            LockSupport.parkNanos(100000000L);
        } else {
            LockSupport.parkNanos(1000000000 * i);
        }
    }

    private static Message toMessage(byte[] bArr) {
        return MessageSerde.get().decode(bArr);
    }
}
