package kd.bos.schedule.next.observable;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.schedule.next.observable.model.ObservableModel;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/bos/schedule/next/observable/ObservableLogHandler.class */
public class ObservableLogHandler implements Runnable {
    private static final Log logger = LogFactory.getLog(ObservableLogHandler.class);
    private static ConcurrentHashMap<String, IObservableDataFilter> filterList = new ConcurrentHashMap<>();
    private static ExecutorService executor = ThreadPools.newCachedExecutorService("BOSSchedule-TraceLogHandler", 1, 1);
    private static LinkedBlockingQueue observableQueue = new LinkedBlockingQueue();

    @Override // java.lang.Runnable
    public void run() {
        execute();
    }

    public void execute() {
        while (true) {
            try {
                ObservableModel observableModel = (ObservableModel) observableQueue.take();
                for (Map.Entry<String, IObservableDataFilter> entry : filterList.entrySet()) {
                    try {
                        entry.getValue().handle(observableModel);
                    } catch (Throwable th) {
                        logger.error("Schedule***observableFilter error,filterName : {}", entry.getKey(), th);
                    }
                }
            } catch (Exception e) {
                logger.error("Error:Schedule***ObservableLogHandler execute error", e);
            }
        }
    }

    public static ConcurrentHashMap<String, IObservableDataFilter> getFilters() {
        return filterList;
    }

    public static IObservableDataFilter getFilters(String str) {
        if (filterList.containsKey(str)) {
            return filterList.get(str);
        }
        return null;
    }

    public static void removeFilter(String str) {
        if (filterList.containsKey(str)) {
            filterList.remove(str);
        }
    }

    public static void addFilter(String str) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
        try {
            IObservableDataFilter iObservableDataFilter = (IObservableDataFilter) Class.forName(str).newInstance();
            long currentTimeMillis = System.currentTimeMillis();
            iObservableDataFilter.init();
            logger.info("Schedule***init time:" + str + ",cost:" + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            filterList.put(str, iObservableDataFilter);
        } catch (Exception e) {
            logger.error("Error:Schedule***ObservableLogHandler addFilter error", e);
        }
    }

    public static void collectData(ObservableModel observableModel) {
        if (observableModel != null) {
            try {
                observableQueue.put(observableModel);
            } catch (InterruptedException e) {
                logger.error("Error:Schedule***TraceLogHandler", e);
            }
        }
    }

    static {
        try {
            if (Boolean.getBoolean("Schedule.disableToWork")) {
                logger.info("Schedule***当前节点禁用所有调度服务");
            } else {
                addFilter("kd.bos.schedule.next.observable.filter.SchTaskStatusNumFilter");
                addFilter("kd.bos.schedule.next.observable.filter.Top5ScheduleTaskNumFilter");
                addFilter("kd.bos.schedule.next.observable.filter.SchTaskJobTypeNumFilter");
                addFilter("kd.bos.schedule.next.observable.filter.DetectTaskDataFilter");
                addFilter("kd.bos.schedule.next.observable.filter.TaskTraceFilter");
                executor.submit(new ObservableLogHandler());
            }
        } catch (Throwable th) {
            logger.error("Error:Schedule***ObservableLogHandler init", th);
        }
    }
}
