package kd.bos.schedule.server.broadcast;

import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kd.bos.cache.CacheFactory;
import kd.bos.cache.DistributeCacheHAPolicy;
import kd.bos.cache.DistributeSessionlessCache;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.dlock.DLock;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.ErrorCode;
import kd.bos.exception.KDException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.schedule.api.AbstractJobHandler;
import kd.bos.schedule.api.BroadcastTask;
import kd.bos.schedule.api.JobInfo;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.ShardingUtil;
import kd.bos.schedule.api.StrategyType;
import kd.bos.schedule.api.Task;
import kd.bos.schedule.dao.dbImpl.TaskCache;
import kd.bos.schedule.message.JobProcessor;
import kd.bos.schedule.zk.ActiveKeyValueStore;
import kd.bos.schedule.zk.ZkConfig;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/bos/schedule/server/broadcast/BroadcastJobHandler.class */
public class BroadcastJobHandler extends AbstractJobHandler {
    private static ThreadPool threadPool = ThreadPools.newFixedThreadPool("BroadcastJobHandler", ZkConfig.getNumOfWorkThread());
    private static Log log = LogFactory.getLog(BroadcastJobHandler.class);
    private static DistributeSessionlessCache cache = CacheFactory.getCommonCacheFactory().getDistributeSessionlessCache((String) null, new DistributeCacheHAPolicy(true, true));

    private ThreadPool getThreadPool() {
        return threadPool;
    }

    public void execute(RequestContext requestContext, MessageInfo messageInfo, JobInfo jobInfo) {
        String taskId = jobInfo.getTaskId();
        Future submit = getThreadPool().submit(() -> {
            String taskClassname = jobInfo.getTaskClassname();
            if (taskClassname == null) {
                throw new KDException(new ErrorCode("TASK_NAME_EMPTY", jobInfo.getName()), new Object[0]);
            }
            try {
                try {
                    BroadcastTask broadcastTask = (Task) Class.forName(taskClassname.trim()).newInstance();
                    setBroadcastParam(jobInfo);
                    broadcastTask.setTaskId(messageInfo.getTaskId());
                    broadcastTask.setMessageHandle(this);
                    if (!(broadcastTask instanceof BroadcastTask)) {
                        throw new KDException(BosErrorCode.cannotLoadBeanClass, new Object[]{ResManager.loadKDString("任务类没有实现BroadcastTask接口", "BroadcastJobHandler_0", "bos-schedule", new Object[0])});
                    }
                    broadcastTask.execute(requestContext, jobInfo.getParams(), ShardingUtil.getBroadcastVO());
                    return broadcastTask;
                } catch (IllegalAccessException | InstantiationException e) {
                    throw new KDException(e, new ErrorCode("TASK_CLASS_INIT_FAILED", taskClassname), new Object[0]);
                }
            } catch (ClassNotFoundException e2) {
                throw new KDException(e2, new ErrorCode("TASK_NAME_EMPTY", taskClassname), new Object[0]);
            }
        });
        try {
            int timeout = jobInfo.getTimeout();
            if (timeout <= 0) {
                submit.get();
            } else {
                submit.get(timeout, TimeUnit.SECONDS);
            }
        } catch (InterruptedException | ExecutionException e) {
            if (!isStop(taskId)) {
                throw new RuntimeException(e.toString(), e);
            }
            stop(taskId);
        } catch (TimeoutException e2) {
            if (StrategyType.WAITBEFORETASK.getValue().equals(jobInfo.getStrategy())) {
                String jobLockKey = JobProcessor.getJobLockKey(jobInfo);
                log.info("后台事务-超时锁释放:" + jobLockKey);
                DLock.forceUnlock(new String[]{jobLockKey});
            }
            TimeOut(taskId);
        }
    }

    private void setBroadcastParam(JobInfo jobInfo) {
        String appId = jobInfo.getAppId();
        ActiveKeyValueStore create = ActiveKeyValueStore.create();
        create.initAppIdExeServerMap(appId);
        TreeMap exeServerMap = create.getExeServerMap();
        Integer num = (Integer) exeServerMap.get(ZkConfig.getExecutorServerName());
        if (exeServerMap.size() == 0) {
            ShardingUtil.setBroadcastVO(new ShardingUtil.BroadcastVO(0, 0));
        } else if (num == null) {
            ShardingUtil.setBroadcastVO(new ShardingUtil.BroadcastVO(0, exeServerMap.size()));
        } else {
            ShardingUtil.setBroadcastVO(new ShardingUtil.BroadcastVO(num.intValue(), exeServerMap.size()));
        }
    }

    public static void TimeOut(String str) throws KDException {
        throw new KDException(new ErrorCode("TASK_TIMEOUT", str), new Object[0]);
    }

    public static void stop(String str) throws KDException {
        throw new KDException(new ErrorCode("TASK_STOPED_BY_USER", str), new Object[0]);
    }

    public boolean isStop(String str) {
        String str2 = (String) cache.get("TASK", RequestContext.get().getAccountId() + "_STOP_" + str);
        return (str2 == null || str2.length() <= 0) ? TaskCache.getStopStatus(str) : "stop".equalsIgnoreCase(str2);
    }
}
