package kd.bos.schedule.executor;

import com.alibaba.fastjson.JSON;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import kd.bos.context.RequestContext;
import kd.bos.instance.Instance;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageAcker;
import kd.bos.mq.MessageConsumer;
import kd.bos.mservice.monitor.HealthLevel;
import kd.bos.mservice.monitor.healthmanage.cluster.ClusterHealth;
import kd.bos.schedule.api.JobInfo;
import kd.bos.schedule.api.JobType;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.MessageType;
import kd.bos.schedule.api.MessageWatcher;
import kd.bos.schedule.api.ScheduleInfo;
import kd.bos.schedule.api.ScheduleTypeNextEnums;
import kd.bos.schedule.message.ReadyTasks;
import kd.bos.schedule.message.RunningTasks;
import kd.bos.schedule.next.observable.util.SchObservableCollectData;
import kd.bos.schedule.utils.RequestContextUtils;
import kd.bos.schedule.zk.ZkConfig;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/bos/schedule/executor/SchExecutorMessageCustomer.class */
public class SchExecutorMessageCustomer implements MessageConsumer {
    private static final Log log = LogFactory.getLog(SchExecutorMessageCustomer.class);
    private static AtomicReference<MessageWatcher> customer = new AtomicReference<>(null);
    private static String waitTime = System.getProperty("schedule.executor.initwaittime", "5000");
    private static long retryTime = 600000;
    private static final Map<MessageType, MessageType> workInitMap = new ConcurrentHashMap(8);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/schedule/executor/SchExecutorMessageCustomer$Work.class */
    public static class Work extends Thread {
        private final MessageType messageType;

        Work(MessageType messageType) {
            this.messageType = messageType;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                SchExecutorMessageCustomer.log.info("Schedule***messageType:{},Work Thread begin to run", this.messageType);
                ReadyTasks readyTasks = ReadyTasks.getInstance(this.messageType);
                while (true) {
                    MessageInfo take = readyTasks.take();
                    if (take == null) {
                        return;
                    }
                    int i = 0;
                    while (!beginRun()) {
                        try {
                            try {
                                Thread.sleep(i * 100);
                            } catch (InterruptedException e) {
                            }
                            if (i < 100) {
                                i++;
                            }
                        } catch (Throwable th) {
                            SchExecutorMessageCustomer.log.error("Schedule***messageType:{},Work Thread take message error", this.messageType, th);
                        }
                    }
                    doExecute(take);
                }
            } catch (Throwable th2) {
                SchExecutorMessageCustomer.log.error("Schedule***messageType:{},Work Thread error", this.messageType, th2);
            }
        }

        private boolean beginRun() {
            return RunningTasks.getInstance(this.messageType).size() < ZkConfig.getMaxNumOfWorkThread(this.messageType);
        }

        private void doExecute(MessageInfo messageInfo) {
            try {
                ((MessageWatcher) SchExecutorMessageCustomer.customer.get()).dispatch(messageInfo, (MessageAcker) null);
            } catch (Throwable th) {
                SchExecutorMessageCustomer.log.error("Schedule*** messageInfo doExecute fail. messageinfo:{}", messageInfo, th);
            }
        }
    }

    public void onMessage(Object obj, String str, boolean z, MessageAcker messageAcker) {
        long currentTimeMillis = System.currentTimeMillis();
        if (!ensureReadyToCustom()) {
            messageAcker.deny(str);
            return;
        }
        try {
            MessageInfo messageInfo = (MessageInfo) JSON.parseObject((String) obj, MessageInfo.class);
            if (!checkServerStatus()) {
                log.error("Schedule***当前节点不健康，调度服务暂不消费消息，休眠3分钟后再尝试消费，消息：{}", messageInfo);
                try {
                    Thread.sleep(180000L);
                } catch (InterruptedException e) {
                    log.error(e);
                }
                messageAcker.deny(str);
                return;
            }
            messageInfo.setMessageMQId(str);
            RequestContextUtils.fillContext(messageInfo, RequestContext.get());
            JobInfo fetchJobInfo = messageInfo.fetchJobInfo();
            try {
                initWork(messageInfo.getMessageType());
                if (isCanRun(messageInfo)) {
                    try {
                        messageAcker.ack(str);
                        log.info("Schedule***从MQ中接收到消息：{},resend：{}", messageInfo, Boolean.valueOf(z));
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        HashMap hashMap = new HashMap(2);
                        hashMap.put("receiveMessageTime", Long.valueOf(currentTimeMillis2));
                        hashMap.put("receiveMessageInfo", messageInfo);
                        SchObservableCollectData.collectData(RequestContext.get().getTenantId(), RequestContext.get().getAccountId(), "Client", "receiveMessage", hashMap);
                        if (fetchJobInfo.getJobType() == JobType.DETECT) {
                            ScheduleInfo scheduleInfo = new ScheduleInfo();
                            scheduleInfo.setTenantId(messageInfo.getTenantId());
                            scheduleInfo.setAccountId(messageInfo.getAccountId());
                            scheduleInfo.setScheduleType(ScheduleTypeNextEnums.Detect);
                            scheduleInfo.setJobInfo(fetchJobInfo);
                            SchObservableCollectData.collectData(messageInfo.getTenantId(), messageInfo.getAccountId(), "Client", "receiveTaskSuccess", scheduleInfo);
                        }
                    } catch (Throwable th) {
                        log.error("Schedule***处理MQ消息, 出现异常：{}", messageInfo, th);
                    }
                } else {
                    log.info("Schedule***当前就绪队列已满，且已在当前节点尝试{}分钟无法加入队列，消息返回MQ。message：{}", Long.valueOf((retryTime / 60) / 1000), messageInfo);
                    messageAcker.deny(str);
                }
            } catch (Exception e2) {
                log.error("Schedule***处理MQ消息, 出现异常：{}", messageInfo, e2);
                messageAcker.deny(str);
            }
        } catch (Exception e3) {
            log.error("Schedule***处理MQ消息：序列化出现异常：", e3);
            messageAcker.ack(str);
            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
            HashMap hashMap2 = new HashMap(2);
            hashMap2.put("receiveMessageTime", Long.valueOf(currentTimeMillis3));
            SchObservableCollectData.collectData(RequestContext.get().getTenantId(), RequestContext.get().getTenantId(), "Client", "receiveMessageError", hashMap2);
        }
    }

    private boolean ensureReadyToCustom() {
        if (customer.get() == null && ExecutorService.getInstance() != null && ExecutorService.getInstance().isStarted()) {
            customer.compareAndSet(null, ExecutorService.getInstance().getObjectFactory().getMessageWatcher());
        }
        boolean z = customer.get() != null;
        if (!z) {
            try {
                Thread.sleep(Integer.parseInt(waitTime));
            } catch (InterruptedException e) {
            }
        }
        return z;
    }

    private boolean isCanRun(MessageInfo messageInfo) {
        return ReadyTasks.getInstance(messageInfo.getMessageType()).in(messageInfo, retryTime);
    }

    private boolean checkServerStatus() {
        if (Instance.isAppSplit()) {
            if ("false".equalsIgnoreCase(ZkConfig.isCheckServerStatusStr())) {
                return true;
            }
            return innerCheckHealth();
        }
        if ("true".equalsIgnoreCase(ZkConfig.isCheckServerStatusStr())) {
            return innerCheckHealth();
        }
        return true;
    }

    private boolean innerCheckHealth() {
        try {
            boolean z = true;
            if (!isHealth(ClusterHealth.getHealth(Instance.getInstanceId()))) {
                z = false;
            }
            if (!z) {
                boolean z2 = false;
                Map map = (Map) ClusterHealth.getClusterApplicationHealth().get(Instance.getAppName());
                if (map != null && !map.isEmpty()) {
                    Iterator it = map.entrySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Map.Entry entry = (Map.Entry) it.next();
                        if (isHealth(((Integer) entry.getKey()).intValue()) && ((AtomicInteger) entry.getValue()).get() > 0) {
                            z2 = true;
                            break;
                        }
                    }
                }
                if (!z2) {
                    log.warn("Schedule***当前节点不健康，但当前应用节点没有其他健康的节点，当前节点调度服务仍然处理消息。");
                    z = true;
                }
            }
            return z;
        } catch (Exception e) {
            log.error("Schedule***checkhealth error", e);
            return true;
        }
    }

    private boolean isHealth(int i) {
        return (HealthLevel.ERROR.getLevel() == i || HealthLevel.OVERLOAD.getLevel() == i) ? false : true;
    }

    private void initWork(MessageType messageType) {
        if (workInitMap.containsKey(messageType)) {
            return;
        }
        synchronized (workInitMap) {
            if (!workInitMap.containsKey(messageType)) {
                String str = "BOSSchedule-Executor-Worker-" + messageType;
                ThreadPools.executeOnce(str, new Work(messageType));
                workInitMap.put(messageType, messageType);
                log.info("Schedule*** initworker threadpool" + str);
            }
        }
    }
}
