package kd.wtc.wtbs.business.task.executor.center;

import java.util.Map;
import java.util.concurrent.locks.LockSupport;
import kd.bos.context.RequestContext;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageAcker;
import kd.bos.mq.MessageConsumer;
import kd.wtc.wtbs.business.task.base.ErrorMsgUtils;
import kd.wtc.wtbs.business.task.base.ShardingTaskExLog;
import kd.wtc.wtbs.business.task.base.ShardingTaskProgressReporter;
import kd.wtc.wtbs.business.task.base.ShardingTaskProgressReporterImpl;
import kd.wtc.wtbs.business.task.base.TaskConfig;
import kd.wtc.wtbs.business.task.base.TaskRepositoryImpl;
import kd.wtc.wtbs.business.task.common.WTCTaskParamKeys;
import kd.wtc.wtbs.business.task.executor.WTCShardingAbleTask;
import kd.wtc.wtbs.common.util.WTCStringUtils;

/* loaded from: input_file:kd/wtc/wtbs/business/task/executor/center/WTCTaskConsumer.class */
public class WTCTaskConsumer implements MessageConsumer {
    protected static final Log log = LogFactory.getLog(WTCTaskConsumer.class);
    private static long PARK_NANOS_BEFORE_DENY_MESSAGE = Long.parseLong(System.getProperty("wtc.task.st.park.nanos.befoer.deny", "5000000000"));
    private static volatile boolean BASED_RELIABLE_CHANNEL = Boolean.parseBoolean(System.getProperty("wtc.task.st.enable.reliable.channel", "true"));
    private static final ShardingTaskProgressReporter stReporter = new ShardingTaskProgressReporterImpl(TaskRepositoryImpl.getInstance());

    public void onMessage(Object obj, String str, boolean z, MessageAcker messageAcker) {
        Map<String, Object> map = (Map) obj;
        long longValue = ((Long) map.getOrDefault(WTCTaskParamKeys.WTCShardingTaskId, 0L)).longValue();
        log.info("WTCTaskConsumer accept task[shardingTaskId={}] and then begin parse param.", Long.valueOf(longValue));
        String valueOf = String.valueOf(map.get("category"));
        if (WTCStringUtils.isEmpty(valueOf)) {
            String format = String.format("WTCTaskConsumer detected message illegal, the message field category=%s", valueOf);
            log.info(format);
            reportError(longValue, format);
            messageAcker.deny(str);
            return;
        }
        try {
            WTCShardingAbleTask shardingTaskExecutor = TaskConfig.getShardingTaskExecutor(valueOf);
            if (shardingTaskExecutor == null) {
                String format2 = String.format("WTCTaskConsumer can not find ShardingTaskExecutor by category=%s", valueOf);
                log.info(format2);
                reportError(longValue, format2);
                messageAcker.deny(str);
                return;
            }
            if (!WTCShardingExecutorManager.tryExecute(longValue, shardingTaskExecutor)) {
                log.info("WTCTaskConsumer tryExecute shardingTask[shardingTaskId={}] fail, now deny it.", Long.valueOf(longValue));
                LockSupport.parkNanos(PARK_NANOS_BEFORE_DENY_MESSAGE);
                messageAcker.deny(str);
                return;
            }
            try {
                if (BASED_RELIABLE_CHANNEL) {
                    shardingTaskExecutor.setMessageAckInfo(messageAcker, str);
                } else {
                    messageAcker.ack(str);
                }
                log.info("WTCTaskConsumer all ready execute shardingTask[shardingTaskId={}]", Long.valueOf(longValue));
                shardingTaskExecutor.execute(RequestContext.get(), map, null);
                WTCShardingExecutorManager.end(longValue);
            } catch (Throwable th) {
                WTCShardingExecutorManager.end(longValue);
                throw th;
            }
        } catch (Throwable th2) {
            String str2 = "WTCTaskConsumer getShardingTaskExecutor error.\n" + ErrorMsgUtils.getStackTraceMessage(th2, 50);
            log.info(str2);
            reportError(longValue, str2);
            messageAcker.deny(str);
        }
    }

    private static void reportError(long j, String str) {
        ShardingTaskExLog shardingTaskExLog = new ShardingTaskExLog();
        shardingTaskExLog.setShardingTaskId(j);
        shardingTaskExLog.setExStr(str);
        stReporter.reportError(shardingTaskExLog);
    }

    public static boolean isBasedReliableChannel() {
        return BASED_RELIABLE_CHANNEL;
    }

    public static void setBasedReliableChannel(boolean z) {
        BASED_RELIABLE_CHANNEL = z;
    }
}
