package kd.bos.schedule.executor;

import com.alibaba.fastjson.JSON;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import kd.bos.instance.Instance;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageAcker;
import kd.bos.mq.MessageConsumer;
import kd.bos.mservice.monitor.HealthLevel;
import kd.bos.mservice.monitor.healthmanage.cluster.ClusterHealth;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.MessageType;
import kd.bos.schedule.api.MessageWatcher;
import kd.bos.schedule.message.RunningTasks;
import kd.bos.schedule.zk.ZkConfig;

@Deprecated
/* loaded from: input_file:kd/bos/schedule/executor/ExecutorMessageCustomer.class */
public class ExecutorMessageCustomer implements MessageConsumer {
    private static final Log log = LogFactory.getLog(ExecutorMessageCustomer.class);
    private static AtomicReference<MessageWatcher> customer = new AtomicReference<>(null);
    private static String waitTime = System.getProperty("schedule.executor.initwaittime", "5000");
    private static long retryTime = 600000;
    private static long interval = 100;

    public void onMessage(Object obj, String str, boolean z, MessageAcker messageAcker) {
        if (!ensureReadyToCustom()) {
            messageAcker.deny(str);
            return;
        }
        try {
            MessageInfo messageInfo = (MessageInfo) JSON.parseObject((String) obj, MessageInfo.class);
            if (!checkServerStatus()) {
                log.error("Schedule***当前节点不健康，调度服务暂不消费消息，休眠3分钟后再尝试消费，消息：{}", messageInfo);
                try {
                    Thread.sleep(180000L);
                } catch (InterruptedException e) {
                    log.error(e);
                }
                messageAcker.deny(str);
                return;
            }
            try {
                int i = 0;
                long j = retryTime / interval;
                long currentTimeMillis = System.currentTimeMillis();
                messageInfo.fetchJobInfo();
                while (true) {
                    boolean isCanRun = isCanRun(messageInfo.getMessageType());
                    if (isCanRun) {
                        synchronized (messageInfo.getMessageType()) {
                            isCanRun = isCanRun(messageInfo.getMessageType());
                            if (isCanRun) {
                                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                                if (currentTimeMillis2 > retryTime) {
                                    log.info("Schedule***任务在本地节点等待时间超时，返回MQ。message：{},awaitCostTime:{}", messageInfo, Long.valueOf(currentTimeMillis2));
                                    messageAcker.deny(str);
                                } else {
                                    try {
                                        messageAcker.ack(str);
                                        log.info("Schedule***从MQ中接收到消息：{}", messageInfo);
                                        messageInfo.setMessageMQId(str);
                                        customer.get().dispatch(messageInfo, messageAcker);
                                    } catch (Throwable th) {
                                        log.error("Schedule***处理MQ消息, 出现异常：{}", messageInfo, th);
                                    }
                                }
                            }
                        }
                    }
                    if (!isCanRun) {
                        if (i > j) {
                            log.info("Schedule***当前没有空闲的MQ线程消费消息，返回MQ。message：{}", messageInfo);
                            messageAcker.deny(str);
                            break;
                        } else {
                            Thread.sleep(interval);
                            if (i == 0) {
                                log.info("Schedule***当前没有空闲的MQ线程消费消息，将在当前节点尝试，retryTime:{},interval：{},message：{}", new Object[]{Long.valueOf(retryTime), Long.valueOf(interval), messageInfo});
                            }
                        }
                    }
                    i++;
                }
            } catch (Exception e2) {
                log.error("Schedule***处理MQ消息, 出现异常：" + messageInfo, e2);
                messageAcker.deny(str);
            }
        } catch (Exception e3) {
            log.error("Schedule***处理MQ消息：序列化出现异常：", e3);
            messageAcker.ack(str);
        }
    }

    private boolean ensureReadyToCustom() {
        if (customer.get() == null && ExecutorService.getInstance() != null && ExecutorService.getInstance().isStarted()) {
            customer.compareAndSet(null, ExecutorService.getInstance().getObjectFactory().getMessageWatcher());
        }
        boolean z = customer.get() != null;
        if (!z) {
            try {
                Thread.sleep(Integer.parseInt(waitTime));
            } catch (InterruptedException e) {
            }
        }
        return z;
    }

    private boolean isCanRun(MessageType messageType) {
        return RunningTasks.getInstance(messageType).size() < ZkConfig.getMaxNumOfWorkThread(messageType);
    }

    private boolean checkServerStatus() {
        if (Instance.isAppSplit()) {
            if ("false".equalsIgnoreCase(ZkConfig.isCheckServerStatusStr())) {
                return true;
            }
            return innerCheckHealth();
        }
        if ("true".equalsIgnoreCase(ZkConfig.isCheckServerStatusStr())) {
            return innerCheckHealth();
        }
        return true;
    }

    private boolean innerCheckHealth() {
        try {
            boolean z = true;
            if (!isHealth(ClusterHealth.getHealth(Instance.getInstanceId()))) {
                z = false;
            }
            if (!z) {
                boolean z2 = false;
                Map map = (Map) ClusterHealth.getClusterApplicationHealth().get(Instance.getAppName());
                if (map != null && !map.isEmpty()) {
                    Iterator it = map.entrySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Map.Entry entry = (Map.Entry) it.next();
                        if (isHealth(((Integer) entry.getKey()).intValue()) && ((AtomicInteger) entry.getValue()).get() > 0) {
                            z2 = true;
                            break;
                        }
                    }
                }
                if (!z2) {
                    log.warn("Schedule***当前节点不健康，但当前应用节点没有其他健康的节点，当前节点调度服务仍然处理消息。");
                    z = true;
                }
            }
            return z;
        } catch (Exception e) {
            log.error("Schedule***checkhealth error", e);
            return true;
        }
    }

    private boolean isHealth(int i) {
        return (HealthLevel.ERROR.getLevel() == i || HealthLevel.OVERLOAD.getLevel() == i) ? false : true;
    }
}
