package kd.bos.schedule.message.mq;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import kd.bos.context.RequestContext;
import kd.bos.dc.api.model.Account;
import kd.bos.dc.utils.AccountUtils;
import kd.bos.exception.KDException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.schedule.api.MessageHandler;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.ObjectFactory;
import kd.bos.schedule.api.Subscriber;
import kd.bos.schedule.api.TaskDao;
import kd.bos.schedule.api.TaskInfo;
import kd.bos.schedule.api.TaskStatusConstant;
import kd.bos.schedule.message.MessageCreator;
import kd.bos.schedule.message.MessageTitleConstant;
import kd.bos.schedule.utils.RequestContextUtils;
import kd.bos.schedule.zk.ZkConfig;
import kd.bos.threads.ThreadPools;
import org.apache.curator.shaded.com.google.common.base.Objects;

/* loaded from: input_file:kd/bos/schedule/message/mq/MQSubscriber.class */
public class MQSubscriber implements Subscriber {
    private static final Log log = LogFactory.getLog(MQSubscriber.class);
    ObjectFactory objectFactory;
    Thread thread;
    private static final String TWO_PLACEHOLDER = "%s#%s";
    ConcurrentHashMap<String, MessageHandler> subscriberContain = new ConcurrentHashMap<>();
    AtomicBoolean isStart = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/schedule/message/mq/MQSubscriber$Monitor.class */
    public class Monitor implements Runnable {
        private Monitor() {
        }

        public void execute() {
            MQSubscriber.log.info("ScheduleMonitor is running.");
            while (MQSubscriber.this.isStart.get()) {
                int circle4JobCallBack = ZkConfig.getCircle4JobCallBack() * 1000;
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    oneCycle();
                } catch (Exception e) {
                    MQSubscriber.log.error(e);
                }
                long currentTimeMillis2 = circle4JobCallBack - (System.currentTimeMillis() - currentTimeMillis);
                if (currentTimeMillis2 > 0) {
                    try {
                        Thread.sleep(currentTimeMillis2);
                    } catch (InterruptedException e2) {
                        MQSubscriber.log.error(e2);
                    }
                }
            }
        }

        public void oneCycle() {
            TaskDao taskDao = MQSubscriber.this.objectFactory.getTaskDao();
            for (Account account : AccountUtils.getAllAccountsOfCurrentEnv()) {
                String accountId = account.getAccountId();
                List list = (List) MQSubscriber.this.subscriberContain.keySet().stream().filter(str -> {
                    return Objects.equal(accountId, str.split("#")[0]);
                }).map(str2 -> {
                    return str2.split("#")[1];
                }).collect(Collectors.toList());
                if (!list.isEmpty()) {
                    try {
                        RequestContextUtils.createRequestContext(account.getTenantId(), accountId, null);
                        Iterator it = taskDao.get(list).iterator();
                        while (it.hasNext()) {
                            processTaskCallBack((TaskInfo) it.next());
                        }
                    } catch (Exception e) {
                        MQSubscriber.log.error(e);
                    }
                }
            }
        }

        private void processTaskCallBack(TaskInfo taskInfo) {
            String accountId = RequestContext.get().getAccountId();
            String id = taskInfo.getId();
            MessageInfo createStatusMessage = MessageCreator.createStatusMessage(taskInfo);
            AsynCallBackExecutor.get().executeCallBack(MQSubscriber.this.subscriberContain.get(String.format(MQSubscriber.TWO_PLACEHOLDER, accountId, id)), createStatusMessage);
            if (MessageTitleConstant.TASK_STATUS.equals(createStatusMessage.getTitle()) && TaskStatusConstant.isEnd(taskInfo.getStatus())) {
                MQSubscriber.this.unSubscribe(id);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            execute();
        }
    }

    public void start() {
        if (this.isStart.get()) {
            return;
        }
        ThreadPools.executeOnce("BOSSchedule-jobClient-monitor", new Monitor());
        this.isStart.compareAndSet(false, true);
    }

    public void stop() {
        if (this.isStart.get()) {
            this.isStart.compareAndSet(true, false);
        }
    }

    public void subscribe(String str, MessageHandler messageHandler) throws KDException {
        if (!this.isStart.get()) {
            start();
        }
        this.subscriberContain.put(String.format(TWO_PLACEHOLDER, RequestContext.get().getAccountId(), str), messageHandler);
    }

    public void unSubscribe(String str) {
        this.subscriberContain.remove(String.format(TWO_PLACEHOLDER, RequestContext.get().getAccountId(), str));
    }

    public void setObjectFactory(ObjectFactory objectFactory) {
        this.objectFactory = objectFactory;
    }
}
