package kd.bos.schedule.next.observable.filter;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import kd.bos.context.RequestContextCreator;
import kd.bos.dataentity.SqlParameter;
import kd.bos.db.DB;
import kd.bos.db.DBRoute;
import kd.bos.db.tx.TX;
import kd.bos.db.tx.TXHandle;
import kd.bos.dc.api.model.Account;
import kd.bos.dc.utils.AccountUtils;
import kd.bos.id.ID;
import kd.bos.instance.Instance;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.orm.ORM;
import kd.bos.orm.query.QFilter;
import kd.bos.orm.util.StringUtils;
import kd.bos.schedule.api.JobInfo;
import kd.bos.schedule.api.JobType;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.MessageType;
import kd.bos.schedule.api.RouteMode;
import kd.bos.schedule.api.ScheduleInfo;
import kd.bos.schedule.api.TaskInfo;
import kd.bos.schedule.next.observable.IObservableConst;
import kd.bos.schedule.next.observable.IObservableDataFilter;
import kd.bos.schedule.next.observable.model.ObservableModel;
import kd.bos.schedule.next.observable.model.TaskTraceInfo;
import kd.bos.schedule.next.observable.model.TaskTraceStatusEnum;
import kd.bos.schedule.next.observable.model.TimerPulse;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/bos/schedule/next/observable/filter/TaskTraceFilter.class */
public class TaskTraceFilter implements IObservableDataFilter {
    private static final String TASK_TRACE_TABLE = "t_sch_tasktrace";
    private Map<String, Map<String, List<String>>> serverBeforeSendMQTaskStatus = new ConcurrentHashMap(16);
    private Map<String, LinkedBlockingQueue<TaskTraceInfo>> serverBeforeSendMQTaskTraceQueue = new ConcurrentHashMap(16);
    private Map<String, LinkedBlockingQueue<TaskTraceInfo>> serverAfterSendMQTaskTraceQueue = new ConcurrentHashMap(16);
    private Map<String, LinkedBlockingQueue<TaskTraceInfo>> executorPushReadyTaskTraceQueue = new ConcurrentHashMap(16);
    private Map<String, Map<String, List<String>>> executorBeforeCompleteTaskStatus = new ConcurrentHashMap(16);
    private Map<String, LinkedBlockingQueue<TaskTraceInfo>> executorCompleteTaskTraceQueue = new ConcurrentHashMap(16);
    private Thread taskTraceFilterThread = new Thread(() -> {
        while (true) {
            try {
                Thread.sleep(60000L);
                List<Account> allAccountsOfCurrentEnv = getAllAccountsOfCurrentEnv();
                long currentTimeMillis = System.currentTimeMillis();
                for (Account account : allAccountsOfCurrentEnv) {
                    try {
                        RequestContextCreator.createBatch(account.getTenantId(), account.getAccountId(), (String) null);
                        LinkedBlockingQueue<TaskTraceInfo> linkedBlockingQueue = this.serverBeforeSendMQTaskTraceQueue.get(account.getAccountId());
                        if (linkedBlockingQueue != null && !linkedBlockingQueue.isEmpty()) {
                            try {
                                insertTaskTraceInfo(linkedBlockingQueue);
                            } catch (Throwable th) {
                                logger.error("Schedule***TaskTraceFilter serverBeforeSendMQTaskTrace insert error", th);
                            }
                        }
                        LinkedBlockingQueue<TaskTraceInfo> linkedBlockingQueue2 = this.serverAfterSendMQTaskTraceQueue.get(account.getAccountId());
                        if (linkedBlockingQueue2 != null && !linkedBlockingQueue2.isEmpty()) {
                            try {
                                updateTaskTraceInfo(linkedBlockingQueue2);
                            } catch (Throwable th2) {
                                logger.error("Schedule***TaskTraceFilter serverAfterSendMQTaskTrace update error", th2);
                            }
                        }
                        LinkedBlockingQueue<TaskTraceInfo> linkedBlockingQueue3 = this.executorPushReadyTaskTraceQueue.get(account.getAccountId());
                        if (linkedBlockingQueue3 != null && !linkedBlockingQueue3.isEmpty()) {
                            try {
                                updateExecutorTaskTraceInfo(linkedBlockingQueue3);
                            } catch (Throwable th3) {
                                logger.error("Schedule***TaskTraceFilter executorPushReadyTaskTrace update error", th3);
                            }
                        }
                        LinkedBlockingQueue<TaskTraceInfo> linkedBlockingQueue4 = this.executorCompleteTaskTraceQueue.get(account.getAccountId());
                        if (linkedBlockingQueue4 != null && !linkedBlockingQueue4.isEmpty()) {
                            try {
                                updateTaskTraceInfo(linkedBlockingQueue4);
                            } catch (Throwable th4) {
                                logger.error("Schedule***TaskTraceFilter executorCompleteTaskTrace update error", th4);
                            }
                        }
                    } catch (Throwable th5) {
                        logger.error("Schedule***TaskTraceFilter account error,accountId:{}", account.getAccountId(), th5);
                    }
                }
                logger.debug("Schedule***TaskTraceFilter task trace costTime : {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (Throwable th6) {
                logger.error("Schedule***TaskTraceFilter task trace filter error", th6);
            }
        }
    });
    private static final Log logger = LogFactory.getLog(TaskTraceFilter.class);
    private static final DBRoute Sch_Route = DBRoute.basedata;
    private static final Integer CACHE_TASK_RECORD_MAXSIZE = 50000;
    private static final Integer BATCH_EXECUTE_SQL_NUMBER = 500;
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:sss");

    @Override // kd.bos.schedule.next.observable.IObservableDataFilter
    public void handle(ObservableModel observableModel) {
        if (observableModel.getData() instanceof TimerPulse) {
            TimerPulse timerPulse = (TimerPulse) observableModel.getData();
            if (isSupportTaskTrace(timerPulse)) {
                String classfiy = observableModel.getClassfiy();
                boolean z = -1;
                switch (classfiy.hashCode()) {
                    case -2006833376:
                        if (classfiy.equals(IObservableConst.Classify.Server.pushLocalQueueSuccess)) {
                            z = 3;
                            break;
                        }
                        break;
                    case -1961423567:
                        if (classfiy.equals(IObservableConst.Classify.Server.accountNotExist)) {
                            z = true;
                            break;
                        }
                        break;
                    case -1758867049:
                        if (classfiy.equals(IObservableConst.Classify.Server.popLocalQueueSuccess)) {
                            z = 4;
                            break;
                        }
                        break;
                    case -1457391131:
                        if (classfiy.equals(IObservableConst.Classify.Server.scheduleVisitorPaused)) {
                            z = 2;
                            break;
                        }
                        break;
                    case 1718491618:
                        if (classfiy.equals(IObservableConst.Classify.Server.generateTime)) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        addGenerateTimeTaskTraceStatus(timerPulse, observableModel.getAccountId(), observableModel.getTenantId());
                        return;
                    case true:
                        addTaskNotAllowedFire(timerPulse, observableModel.getAccountId(), observableModel.getTenantId(), TaskTraceStatusEnum.SERVER_ACCOUNT_NOT_EXIST);
                        return;
                    case true:
                        addTaskNotAllowedFire(timerPulse, observableModel.getAccountId(), observableModel.getTenantId(), TaskTraceStatusEnum.SERVER_SCHEDULE_VISITOR_PAUSED);
                        return;
                    case true:
                        addTaskTraceStatusBeforeSendMQ(timerPulse, observableModel.getAccountId(), observableModel.getTenantId(), TaskTraceStatusEnum.SERVER_PUSH_LOCAL_QUEUE);
                        return;
                    case true:
                        addTaskTraceStatusBeforeSendMQ(timerPulse, observableModel.getAccountId(), observableModel.getTenantId(), TaskTraceStatusEnum.SERVER_POP_LOCAL_QUEUE);
                        return;
                    default:
                        return;
                }
            }
            return;
        }
        if (!(observableModel.getData() instanceof MessageInfo)) {
            if ((observableModel.getData() instanceof TaskInfo) && StringUtils.endsWithIgnoreCase(observableModel.getClassfiy(), IObservableConst.Classify.Client.abortTaskByReboot)) {
                abortTaskTraceInfo(observableModel);
                return;
            }
            return;
        }
        MessageInfo messageInfo = (MessageInfo) observableModel.getData();
        JobInfo fetchJobInfo = messageInfo.fetchJobInfo();
        if (isSupportTaskTrace(fetchJobInfo)) {
            if ((messageInfo.getMessageType() == MessageType.BIZJOB || messageInfo.getMessageType() == MessageType.BROADCASTJOB) && fetchJobInfo != null && fetchJobInfo.isFromScheduler()) {
                String classfiy2 = observableModel.getClassfiy();
                boolean z2 = -1;
                switch (classfiy2.hashCode()) {
                    case -2067833210:
                        if (classfiy2.equals(IObservableConst.Classify.Client.timeoutTask)) {
                            z2 = 13;
                            break;
                        }
                        break;
                    case -1830499577:
                        if (classfiy2.equals(IObservableConst.Classify.Client.haveNoMessageHandlers)) {
                            z2 = 9;
                            break;
                        }
                        break;
                    case -1670170526:
                        if (classfiy2.equals(IObservableConst.Classify.Client.threadError)) {
                            z2 = 10;
                            break;
                        }
                        break;
                    case -721581976:
                        if (classfiy2.equals(IObservableConst.Classify.Client.popLocalReadyQueueSuccess)) {
                            z2 = 4;
                            break;
                        }
                        break;
                    case -595967873:
                        if (classfiy2.equals(IObservableConst.Classify.Client.pushLocalReadyQueueSuccess)) {
                            z2 = 3;
                            break;
                        }
                        break;
                    case -239298251:
                        if (classfiy2.equals(IObservableConst.Classify.Server.beginSendMQ)) {
                            z2 = false;
                            break;
                        }
                        break;
                    case -5662921:
                        if (classfiy2.equals(IObservableConst.Classify.Server.sendMQSuccess)) {
                            z2 = true;
                            break;
                        }
                        break;
                    case 75390524:
                        if (classfiy2.equals(IObservableConst.Classify.Server.sendMQERROR)) {
                            z2 = 2;
                            break;
                        }
                        break;
                    case 398342705:
                        if (classfiy2.equals(IObservableConst.Classify.Client.beginExecuteTask)) {
                            z2 = 7;
                            break;
                        }
                        break;
                    case 618991555:
                        if (classfiy2.equals(IObservableConst.Classify.Client.receiveBroadcastTask)) {
                            z2 = 5;
                            break;
                        }
                        break;
                    case 975954814:
                        if (classfiy2.equals(IObservableConst.Classify.Client.submitThreadPool)) {
                            z2 = 6;
                            break;
                        }
                        break;
                    case 1383594949:
                        if (classfiy2.equals(IObservableConst.Classify.Client.completeTaskSuccess)) {
                            z2 = 8;
                            break;
                        }
                        break;
                    case 1718703607:
                        if (classfiy2.equals(IObservableConst.Classify.Client.abortTaskByUser)) {
                            z2 = 11;
                            break;
                        }
                        break;
                    case 2145433508:
                        if (classfiy2.equals(IObservableConst.Classify.Client.skipTask)) {
                            z2 = 12;
                            break;
                        }
                        break;
                }
                switch (z2) {
                    case false:
                        addTaskTraceStatusSendMQ(messageInfo);
                        return;
                    case true:
                        addTaskTraceStatusAfterSendMQ(messageInfo, TaskTraceStatusEnum.SERVER_SEND_MQ_SUCCESS);
                        return;
                    case true:
                        addTaskTraceStatusAfterSendMQ(messageInfo, TaskTraceStatusEnum.SERVER_ERROR);
                        return;
                    case true:
                        addTaskTraceStatusPushReady(messageInfo);
                        return;
                    case true:
                        addTaskTraceStatusBeforeComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_POP_READY);
                        return;
                    case true:
                        addTaskTraceStatusForBroadcast(messageInfo);
                        return;
                    case true:
                        addTaskTraceStatusBeforeComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_SUBMIT_THREAD_POOL);
                        return;
                    case true:
                        addTaskTraceStatusBeforeComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_BEGIN_TASK);
                        return;
                    case true:
                        addTaskTraceStatusComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_COMPLETE_TASK);
                        return;
                    case true:
                        addTaskTraceStatusComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_HAVE_NO_MESSAGE_HANDLE);
                        return;
                    case true:
                        addTaskTraceStatusComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_ERROR);
                        return;
                    case true:
                        addTaskTraceStatusComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_TASK_ABORTED);
                        return;
                    case true:
                        addTaskTraceStatusComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_TASK_SKIP);
                        return;
                    case true:
                        addTaskTraceStatusComplete(messageInfo, TaskTraceStatusEnum.EXECUTOR_TASK_TIMEOUT);
                        return;
                    default:
                        return;
                }
            }
        }
    }

    private boolean isSupportTaskTrace(TimerPulse timerPulse) {
        ScheduleInfo info = timerPulse.getInfo();
        if (info != null) {
            return isSupportTaskTrace(info.getJobInfo());
        }
        return false;
    }

    private boolean isSupportTaskTrace(JobInfo jobInfo) {
        return jobInfo != null && jobInfo.isTaskTrace();
    }

    private void addGenerateTimeTaskTraceStatus(TimerPulse timerPulse, String str, String str2) {
        ScheduleInfo info = timerPulse.getInfo();
        Map<String, List<String>> computeIfAbsent = this.serverBeforeSendMQTaskStatus.computeIfAbsent(str, str3 -> {
            return new ConcurrentHashMap();
        });
        if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
            return;
        }
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList());
        StringBuilder sb = new StringBuilder();
        sb.append(sdf.format(new Date())).append(IObservableConst.Classify.Common.TASK_TRACE_SPLIT).append(TaskTraceStatusEnum.SERVER_GENERATE_TIME.getStatusCode()).append(IObservableConst.Classify.Common.TASK_TRACE_SPLIT).append(sdf.format(Date.from(info.getLastExecuteTime().toInstant()))).append(":").append(Instance.getInstanceId()).append("\r\n");
        synchronizedList.add(sb.toString());
        computeIfAbsent.put(timerPulse.getTaskId(), synchronizedList);
    }

    private void addTaskNotAllowedFire(TimerPulse timerPulse, String str, String str2, TaskTraceStatusEnum taskTraceStatusEnum) {
        Map<String, List<String>> map = this.serverBeforeSendMQTaskStatus.get(str);
        LinkedBlockingQueue<TaskTraceInfo> computeIfAbsent = this.serverBeforeSendMQTaskTraceQueue.computeIfAbsent(str, str3 -> {
            return new LinkedBlockingQueue();
        });
        TaskTraceInfo mergeTaskTraceLog = mergeTaskTraceLog(map.get(timerPulse.getTaskId()), taskTraceStatusEnum, timerPulse);
        try {
            if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
                return;
            }
            computeIfAbsent.put(mergeTaskTraceLog);
            map.remove(timerPulse.getTaskId());
        } catch (InterruptedException e) {
            logger.error("Schedule***TaskTraceFilter BEFORE_SEND_MQ_TASK_TRACE_QUEUE IS INTERRUPTED, taskId : {}", timerPulse.getTaskId(), e);
        }
    }

    private TaskTraceInfo mergeTaskTraceLog(List<String> list, TaskTraceStatusEnum taskTraceStatusEnum, TimerPulse timerPulse) {
        ScheduleInfo info = timerPulse.getInfo();
        TaskTraceInfo taskTraceInfo = new TaskTraceInfo();
        taskTraceInfo.setTaskId(timerPulse.getTaskId());
        taskTraceInfo.setScheduleId(info.getId());
        taskTraceInfo.setJobId(info.getJobId());
        taskTraceInfo.setStatus(taskTraceStatusEnum.getStatusCode());
        JobInfo jobInfo = info.getJobInfo();
        if (jobInfo != null) {
            taskTraceInfo.setGroupId(jobInfo.getGroupId());
            if (jobInfo.getRouteMode() == RouteMode.SHARDINGBROADCAST) {
                taskTraceInfo.setMessageType(MessageType.BROADCASTJOB);
            } else {
                taskTraceInfo.setMessageType(MessageType.BIZJOB);
            }
        }
        StringBuilder sb = new StringBuilder();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            sb.append(it.next());
        }
        sb.append(sdf.format(new Date())).append(IObservableConst.Classify.Common.TASK_TRACE_SPLIT).append(taskTraceStatusEnum.getStatusCode());
        taskTraceInfo.setTaskRecord(sb.toString());
        return taskTraceInfo;
    }

    private TaskTraceInfo mergeTaskTraceLog(List<String> list, TaskTraceStatusEnum taskTraceStatusEnum, MessageInfo messageInfo) {
        JobInfo fetchJobInfo = messageInfo.fetchJobInfo();
        TaskTraceInfo taskTraceInfo = new TaskTraceInfo();
        taskTraceInfo.setTaskId(messageInfo.getTaskId());
        taskTraceInfo.setScheduleId(fetchJobInfo.getScheduleId());
        taskTraceInfo.setJobId(fetchJobInfo.getId());
        taskTraceInfo.setStatus(taskTraceStatusEnum.getStatusCode());
        taskTraceInfo.setGroupId(fetchJobInfo.getGroupId());
        taskTraceInfo.setMessageType(messageInfo.getMessageType());
        StringBuilder sb = new StringBuilder();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            sb.append(it.next());
        }
        sb.append(sdf.format(new Date())).append(IObservableConst.Classify.Common.TASK_TRACE_SPLIT).append(taskTraceStatusEnum.getStatusCode()).append("\r\n");
        taskTraceInfo.setTaskRecord(sb.toString());
        return taskTraceInfo;
    }

    private void addTaskTraceStatusBeforeSendMQ(TimerPulse timerPulse, String str, String str2, TaskTraceStatusEnum taskTraceStatusEnum) {
        Map<String, List<String>> computeIfAbsent = this.serverBeforeSendMQTaskStatus.computeIfAbsent(str, str3 -> {
            return new ConcurrentHashMap();
        });
        if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
            return;
        }
        computeIfAbsent.computeIfAbsent(timerPulse.getTaskId(), str4 -> {
            return Collections.synchronizedList(new ArrayList());
        }).add(generateTaskTraceRecord(taskTraceStatusEnum, sdf));
    }

    private void addTaskTraceStatusSendMQ(MessageInfo messageInfo) {
        Map<String, List<String>> computeIfAbsent = this.serverBeforeSendMQTaskStatus.computeIfAbsent(messageInfo.getAccountId(), str -> {
            return new ConcurrentHashMap();
        });
        if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
            return;
        }
        TaskTraceInfo mergeTaskTraceLog = mergeTaskTraceLog(computeIfAbsent.computeIfAbsent(messageInfo.getTaskId(), str2 -> {
            return Collections.synchronizedList(new ArrayList());
        }), TaskTraceStatusEnum.SERVER_BEGIN_SEND_MQ, messageInfo);
        LinkedBlockingQueue<TaskTraceInfo> computeIfAbsent2 = this.serverBeforeSendMQTaskTraceQueue.computeIfAbsent(messageInfo.getAccountId(), str3 -> {
            return new LinkedBlockingQueue();
        });
        try {
            if (computeIfAbsent2.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
                return;
            }
            computeIfAbsent2.put(mergeTaskTraceLog);
            computeIfAbsent.remove(messageInfo.getTaskId());
        } catch (InterruptedException e) {
            logger.error("Schedule***TaskTraceFilter BEFORE_SEND_MQ_TASK_TRACE_QUEUE IS INTERRUPTED, taskId : {}", messageInfo.getTaskId(), e);
        }
    }

    private void addTaskTraceStatusAfterSendMQ(MessageInfo messageInfo, TaskTraceStatusEnum taskTraceStatusEnum) {
        TaskTraceInfo mergeTaskTraceLog = mergeTaskTraceLog(Collections.EMPTY_LIST, taskTraceStatusEnum, messageInfo);
        LinkedBlockingQueue<TaskTraceInfo> computeIfAbsent = this.serverAfterSendMQTaskTraceQueue.computeIfAbsent(messageInfo.getAccountId(), str -> {
            return new LinkedBlockingQueue();
        });
        try {
            if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
                return;
            }
            computeIfAbsent.put(mergeTaskTraceLog);
        } catch (InterruptedException e) {
            logger.error("Schedule***TaskTraceFilter AFTER_SEND_MQ_TASK_TRACE_QUEUE IS INTERRUPTED, taskId : {}", messageInfo.getTaskId(), e);
        }
    }

    private void addTaskTraceStatusPushReady(MessageInfo messageInfo) {
        LinkedBlockingQueue<TaskTraceInfo> computeIfAbsent = this.executorPushReadyTaskTraceQueue.computeIfAbsent(messageInfo.getAccountId(), str -> {
            return new LinkedBlockingQueue();
        });
        TaskTraceInfo mergeTaskTraceLog = mergeTaskTraceLog(Collections.EMPTY_LIST, TaskTraceStatusEnum.EXECUTOR_PUSH_READY, messageInfo);
        try {
            if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
                return;
            }
            computeIfAbsent.put(mergeTaskTraceLog);
        } catch (InterruptedException e) {
            logger.error("Schedule***TaskTraceFilter PUSH_READY_TASK_TRACE_QUEUE IS INTERRUPTED, taskId : {}", messageInfo.getTaskId(), e);
        }
    }

    private void addTaskTraceStatusForBroadcast(MessageInfo messageInfo) {
        LinkedBlockingQueue<TaskTraceInfo> computeIfAbsent = this.executorPushReadyTaskTraceQueue.computeIfAbsent(messageInfo.getAccountId(), str -> {
            return new LinkedBlockingQueue();
        });
        TaskTraceInfo mergeTaskTraceLog = mergeTaskTraceLog(Collections.EMPTY_LIST, TaskTraceStatusEnum.EXECUTOR_REVEICE_BROADCAST_TASK, messageInfo);
        try {
            if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
                return;
            }
            computeIfAbsent.put(mergeTaskTraceLog);
        } catch (InterruptedException e) {
            logger.error("Schedule***TaskTraceFilter PUSH_READY_TASK_TRACE_QUEUE IS INTERRUPTED, taskId : {}", messageInfo.getTaskId(), e);
        }
    }

    private void addTaskTraceStatusBeforeComplete(MessageInfo messageInfo, TaskTraceStatusEnum taskTraceStatusEnum) {
        Map<String, List<String>> computeIfAbsent = this.executorBeforeCompleteTaskStatus.computeIfAbsent(messageInfo.getAccountId(), str -> {
            return new ConcurrentHashMap();
        });
        if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
            return;
        }
        computeIfAbsent.computeIfAbsent(messageInfo.getTaskId(), str2 -> {
            return Collections.synchronizedList(new ArrayList());
        }).add(generateTaskTraceRecord(taskTraceStatusEnum, sdf));
    }

    private void addTaskTraceStatusComplete(MessageInfo messageInfo, TaskTraceStatusEnum taskTraceStatusEnum) {
        Map<String, List<String>> map = this.executorBeforeCompleteTaskStatus.get(messageInfo.getAccountId());
        if (map.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
            return;
        }
        TaskTraceInfo mergeTaskTraceLog = mergeTaskTraceLog(map.computeIfAbsent(messageInfo.getTaskId(), str -> {
            return Collections.synchronizedList(new ArrayList());
        }), taskTraceStatusEnum, messageInfo);
        LinkedBlockingQueue<TaskTraceInfo> computeIfAbsent = this.executorCompleteTaskTraceQueue.computeIfAbsent(messageInfo.getAccountId(), str2 -> {
            return new LinkedBlockingQueue();
        });
        try {
            if (computeIfAbsent.size() > CACHE_TASK_RECORD_MAXSIZE.intValue()) {
                return;
            }
            computeIfAbsent.put(mergeTaskTraceLog);
            map.remove(messageInfo.getTaskId());
        } catch (InterruptedException e) {
            logger.error("Schedule***TaskTraceFilter COMPLETE_TASK_TRACE_QUEUE IS INTERRUPTED, taskId : {}", messageInfo.getTaskId(), e);
        }
    }

    private String generateTaskTraceRecord(TaskTraceStatusEnum taskTraceStatusEnum, SimpleDateFormat simpleDateFormat) {
        StringBuilder sb = new StringBuilder();
        sb.append(simpleDateFormat.format(new Date())).append(IObservableConst.Classify.Common.TASK_TRACE_SPLIT).append(taskTraceStatusEnum.getStatusCode()).append("\r\n");
        return sb.toString();
    }

    private void abortTaskTraceInfo(ObservableModel observableModel) {
        TXHandle required;
        if (observableModel.getData() instanceof TaskInfo) {
            TaskInfo taskInfo = (TaskInfo) observableModel.getData();
            if (taskInfo.getJobType() == JobType.BIZ) {
                RequestContextCreator.createBatch(observableModel.getTenantId(), observableModel.getAccountId(), (String) null);
                if (existTaskTraceInfo(taskInfo.getId())) {
                    StringBuilder sb = new StringBuilder();
                    sb.append(sdf.format(new Date())).append(IObservableConst.Classify.Common.TASK_TRACE_SPLIT).append(TaskTraceStatusEnum.EXECUTOR_TASK_ABORTED.getStatusCode());
                    SqlParameter[] sqlParameterArr = {new SqlParameter(":ftaskrecord", 12, sb.toString()), new SqlParameter(":fmodifytime", 93, new Timestamp(System.currentTimeMillis())), new SqlParameter(":fstatus", 12, TaskTraceStatusEnum.EXECUTOR_TASK_ABORTED.getStatusCode()), new SqlParameter(":ftaskid", 12, taskInfo.getId())};
                    required = TX.required("abortTaskTraceInfo");
                    Throwable th = null;
                    try {
                        try {
                            try {
                                DB.execute(Sch_Route, "update t_sch_tasktrace SET FTASKRECORD = CONCAT(FTASKRECORD, ?), FMODIFYTIME = ?, FSTATUS = ? WHERE FTASKID = ?", sqlParameterArr);
                            } catch (Throwable th2) {
                                th = th2;
                                throw th2;
                            }
                        } catch (Exception e) {
                            logger.error("Schedule***TaskTraceFilter abortTaskTraceInfo error", e);
                            required.markRollback();
                        }
                        if (required != null) {
                            if (0 == 0) {
                                required.close();
                                return;
                            }
                            try {
                                required.close();
                                return;
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                                return;
                            }
                        }
                        return;
                    } finally {
                    }
                }
                StringBuilder sb2 = new StringBuilder();
                sb2.append(sdf.format(new Date())).append(IObservableConst.Classify.Common.TASK_TRACE_SPLIT).append(TaskTraceStatusEnum.EXECUTOR_TASK_ABORTED.getStatusCode());
                SqlParameter[] sqlParameterArr2 = {new SqlParameter(":FID", -5, Long.valueOf(ID.genLongId())), new SqlParameter(":FTASKID", 93, taskInfo.getId()), new SqlParameter(":FTASKRECORD", 12, sb2.toString()), new SqlParameter(":FMODIFYTIME", 93, new Timestamp(System.currentTimeMillis())), new SqlParameter(":FSCHEDULEID", 12, taskInfo.getScheduleId()), new SqlParameter(":FJOBID", 12, taskInfo.getJobId()), new SqlParameter(":FSTATUS", 12, TaskTraceStatusEnum.EXECUTOR_TASK_ABORTED.getStatusCode())};
                required = TX.required("taskTraceFilterInsert");
                Throwable th4 = null;
                try {
                    try {
                        try {
                            DB.execute(Sch_Route, "INSERT INTO t_sch_tasktrace (FID,FTASKID,FTASKRECORD,FMODIFYTIME,FSCHEDULEID,FJOBID,FSTATUS) values(?,?,?,?,?,?,?)", sqlParameterArr2);
                        } catch (Throwable th5) {
                            th4 = th5;
                            throw th5;
                        }
                    } catch (Exception e2) {
                        logger.error("Schedule***TaskTraceFilter taskTraceFilterInsert error", e2);
                        required.markRollback();
                    }
                    if (required != null) {
                        if (0 == 0) {
                            required.close();
                            return;
                        }
                        try {
                            required.close();
                        } catch (Throwable th6) {
                            th4.addSuppressed(th6);
                        }
                    }
                } finally {
                }
            }
        }
    }

    private boolean existTaskTraceInfo(String str) {
        return ORM.create().exists("sch_tasktracerecord", new QFilter[]{new QFilter("taskid", "=", str)});
    }

    private void insertTaskTraceInfo(LinkedBlockingQueue<TaskTraceInfo> linkedBlockingQueue) {
        ArrayList arrayList = new ArrayList();
        while (!linkedBlockingQueue.isEmpty()) {
            try {
                arrayList.add(linkedBlockingQueue.take());
            } catch (InterruptedException e) {
                logger.error("Schedule***TaskTraceFilter insertTaskTraceInfo Queue is interrupted", e);
            }
            if (arrayList.size() == BATCH_EXECUTE_SQL_NUMBER.intValue()) {
                batchInsertTaskTraceInfo(arrayList);
                arrayList.clear();
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        batchInsertTaskTraceInfo(arrayList);
    }

    private void batchInsertTaskTraceInfo(List<TaskTraceInfo> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (TaskTraceInfo taskTraceInfo : list) {
            arrayList.add(new Object[]{Long.valueOf(ID.genLongId()), taskTraceInfo.getTaskId(), taskTraceInfo.getTaskRecord(), new Timestamp(System.currentTimeMillis()), taskTraceInfo.getScheduleId() != null ? taskTraceInfo.getScheduleId() : " ", taskTraceInfo.getJobId() != null ? taskTraceInfo.getJobId() : " ", taskTraceInfo.getStatus(), Long.valueOf(taskTraceInfo.getGroupId())});
        }
        TXHandle required = TX.required("taskTraceFilterInsert");
        Throwable th = null;
        try {
            try {
                int[] executeBatch = DB.executeBatch(Sch_Route, "INSERT INTO t_sch_tasktrace (FID,FTASKID,FTASKRECORD,FMODIFYTIME,FSCHEDULEID,FJOBID,FSTATUS,FGROUPID) VALUES(?,?,?,?,?,?,?,?)", arrayList);
                for (int i = 0; i < executeBatch.length; i++) {
                    if (executeBatch[i] == 0 && list.size() > i) {
                        logger.error("Schedule***TaskTraceFilter insert task trace info error,taskId={}", list.get(i).getTaskId());
                    }
                }
            } catch (Exception e) {
                logger.error("Schedule***TaskTraceFilter taskTraceFilterInsert error", e);
            }
            if (required != null) {
                if (0 == 0) {
                    required.close();
                    return;
                }
                try {
                    required.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (required != null) {
                if (0 != 0) {
                    try {
                        required.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    required.close();
                }
            }
            throw th3;
        }
    }

    private void updateTaskTraceInfo(LinkedBlockingQueue<TaskTraceInfo> linkedBlockingQueue) {
        ArrayList arrayList = new ArrayList();
        while (!linkedBlockingQueue.isEmpty()) {
            try {
                updateTaskTraceInfoStatus(linkedBlockingQueue.take(), arrayList);
            } catch (InterruptedException e) {
                logger.error("Schedule***TaskTraceFilter updateTaskTraceInfo Queue is interrupted", e);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        for (TaskTraceInfo taskTraceInfo : arrayList) {
            try {
                linkedBlockingQueue.put(taskTraceInfo);
                sb.append(taskTraceInfo.getTaskId()).append(",");
            } catch (InterruptedException e2) {
                logger.error("Schedule***TaskTraceFilter updateTaskTraceInfo putFailUpdateTraceInfo is interrupted, taskId : {}", taskTraceInfo.getTaskId());
            }
        }
        logger.info("Schedule***TaskTraceFilter failUpdateTraceInfo taskIds : {}", sb.toString());
    }

    private void updateExecutorTaskTraceInfo(LinkedBlockingQueue<TaskTraceInfo> linkedBlockingQueue) {
        ArrayList arrayList = new ArrayList();
        LinkedBlockingQueue<TaskTraceInfo> linkedBlockingQueue2 = new LinkedBlockingQueue<>();
        while (!linkedBlockingQueue.isEmpty()) {
            try {
                TaskTraceInfo take = linkedBlockingQueue.take();
                if (take.getMessageType() == MessageType.BROADCASTJOB) {
                    linkedBlockingQueue2.put(take);
                } else {
                    updateTaskTraceInfoStatus(take, arrayList);
                }
            } catch (InterruptedException e) {
                logger.error("Schedule***TaskTraceFilter updateTaskTraceInfo Queue is interrupted", e);
            }
        }
        if (!linkedBlockingQueue2.isEmpty()) {
            insertTaskTraceInfo(linkedBlockingQueue2);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        StringBuilder sb = new StringBuilder();
        for (TaskTraceInfo taskTraceInfo : arrayList) {
            try {
                linkedBlockingQueue.put(taskTraceInfo);
                sb.append(taskTraceInfo.getTaskId()).append(",");
            } catch (InterruptedException e2) {
                logger.error("Schedule***TaskTraceFilter updateTaskTraceInfo putFailUpdateTraceInfo is interrupted, taskId : {}", taskTraceInfo.getTaskId());
            }
        }
        logger.info("Schedule***TaskTraceFilter failUpdateTraceInfo taskIds : {}", sb.toString());
    }

    private void updateTaskTraceInfoStatus(TaskTraceInfo taskTraceInfo, List<TaskTraceInfo> list) {
        boolean z = false;
        TXHandle required = TX.required("taskTraceFilterUpdate");
        Throwable th = null;
        try {
            try {
                try {
                    z = DB.execute(Sch_Route, "UPDATE t_sch_tasktrace SET FTASKRECORD = CONCAT(FTASKRECORD, ?), FMODIFYTIME = ?, FSTATUS = ? WHERE FTASKID = ?", new Object[]{taskTraceInfo.getTaskRecord(), new Timestamp(System.currentTimeMillis()), taskTraceInfo.getStatus(), taskTraceInfo.getTaskId()});
                } catch (Throwable th2) {
                    logger.error("Schedule***TaskTraceFilter taskTraceFilterUpdate error", th2);
                }
                if (required != null) {
                    if (0 != 0) {
                        try {
                            required.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        required.close();
                    }
                }
                if (z) {
                    return;
                }
                list.add(taskTraceInfo);
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (required != null) {
                if (th != null) {
                    try {
                        required.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    required.close();
                }
            }
            throw th5;
        }
    }

    private static List<Account> getAllAccountsOfCurrentEnv() {
        return AccountUtils.getAllAccountsOfCurrentEnv();
    }

    @Override // kd.bos.schedule.next.observable.IObservableDataFilter
    public void handle(Object obj) {
    }

    @Override // kd.bos.schedule.next.observable.IObservableDataFilter
    public void init() {
        ThreadPools.executeOnce("BOSSchedule-ObservableDataFilter-TaskTrace-Thread", this.taskTraceFilterThread);
    }
}
