package kd.bos.schedule.server.realtime;

import java.util.ArrayList;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.TypesContainer;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageAcker;
import kd.bos.schedule.api.Executor;
import kd.bos.schedule.api.MessageHandler;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.MessageType;
import kd.bos.schedule.api.ObjectFactory;
import kd.bos.schedule.message.AbstractService;
import kd.bos.schedule.message.JobProcessor;
import kd.bos.schedule.message.mq.MQObjectFactory;
import kd.bos.schedule.next.observable.util.SchObservableCollectData;
import kd.bos.schedule.utils.RequestContextUtils;

/* loaded from: input_file:kd/bos/schedule/server/realtime/RealtimeExecutor.class */
public class RealtimeExecutor extends AbstractService implements Executor {
    public static final String SCHEDULE_EXECUTOR_NUMOFWORKTHREAD_KEY = "Schedule.Executor.NumOfWorkThread";
    private static Log log = LogFactory.getLog(RealtimeExecutor.class);
    protected Map<MessageType, List<MessageHandler>> messageHandlePool = new EnumMap(MessageType.class);
    protected Map<MessageType, List<Class<? extends MessageHandler>>> messageHandleClassPool = new EnumMap(MessageType.class);
    protected ReentrantLock lock = new ReentrantLock();

    public RealtimeExecutor() {
        registHandler(MessageType.REALTIMEJOB, RealtimeJobHandler.class);
    }

    public void registHandler(MessageType messageType, MessageHandler messageHandler) {
        log.info("Schedule***handler在执行中注册：" + messageHandler.getClass().getSimpleName());
        messageHandler.setObjectFactory(this.objectFactory);
        if (!this.messageHandlePool.containsKey(messageType)) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(messageHandler);
            this.messageHandlePool.put(messageType, arrayList);
        } else {
            List<MessageHandler> list = this.messageHandlePool.get(messageType);
            if (list.contains(messageHandler)) {
                return;
            }
            list.add(messageHandler);
        }
    }

    public void unRegistHandler(MessageType messageType, MessageHandler messageHandler) {
        if (this.messageHandlePool.containsKey(messageType)) {
            List<MessageHandler> list = this.messageHandlePool.get(messageType);
            if (list.contains(messageHandler)) {
                list.remove(messageHandler);
            }
        }
    }

    public void registHandler(MessageType messageType, Class<? extends MessageHandler> cls) {
        log.info("Schedule***handler在执行中注册：" + cls.getSimpleName());
        if (!this.messageHandleClassPool.containsKey(messageType)) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(cls);
            this.messageHandleClassPool.put(messageType, arrayList);
        } else {
            List<Class<? extends MessageHandler>> list = this.messageHandleClassPool.get(messageType);
            if (list.contains(cls)) {
                return;
            }
            list.add(cls);
        }
    }

    public void unRegistHandler(MessageType messageType, Class<? extends MessageHandler> cls) {
        if (this.messageHandleClassPool.containsKey(messageType)) {
            List<Class<? extends MessageHandler>> list = this.messageHandleClassPool.get(messageType);
            if (list.contains(cls)) {
                list.remove(cls);
            }
        }
    }

    public List<MessageHandler> getMessageHandler(MessageType messageType) {
        ArrayList arrayList = new ArrayList();
        List<Class<? extends MessageHandler>> list = this.messageHandleClassPool.get(messageType);
        if (list != null) {
            Iterator<Class<? extends MessageHandler>> it = list.iterator();
            while (it.hasNext()) {
                try {
                    arrayList.add(TypesContainer.createInstance(it.next()));
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }
        if (this.messageHandlePool.get(messageType) != null) {
            arrayList.addAll(this.messageHandlePool.get(messageType));
        }
        return arrayList;
    }

    public void processMessage(MessageInfo messageInfo, MessageAcker messageAcker) {
        JobProcessor jobProcessor = new JobProcessor();
        jobProcessor.setObjectFactory(this.objectFactory);
        List<MessageHandler> messageHandler = getMessageHandler(messageInfo.getMessageType());
        if (messageHandler.isEmpty()) {
            messageInfo.fetchJobInfo();
            log.error("Schedule***没有注册对应的handler,无法处理这个类型的消息:{}", messageInfo);
            SchObservableCollectData.collectData(messageInfo.getTenantId(), messageInfo.getAccountId(), "Client", "haveNoMessageHandlers", messageInfo);
            return;
        }
        RequestContextUtils.fillContext(messageInfo, RequestContext.get());
        for (MessageHandler messageHandler2 : messageHandler) {
            messageHandler2.setObjectFactory(getObjectFactory());
            jobProcessor.setHandler(messageHandler2);
            jobProcessor.setMessage(messageInfo);
            getThreadPool(messageInfo.getMessageType()).execute(jobProcessor);
        }
    }

    protected ObjectFactory createObjectFactory(String str) {
        if (!str.equalsIgnoreCase("MQ")) {
            return null;
        }
        MQObjectFactory mQObjectFactory = new MQObjectFactory();
        mQObjectFactory.setExecutor(this);
        return mQObjectFactory;
    }

    public String getName() {
        return "RealtimeExecutor service";
    }

    public void setObjectFactory(ObjectFactory objectFactory) {
        this.objectFactory = objectFactory;
    }
}
