package kd.bos.schedule.message.zk;

import com.alibaba.fastjson.JSON;
import java.util.Iterator;
import java.util.List;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.MessageType;
import kd.bos.schedule.message.AbstractMessageWatcher;
import kd.bos.schedule.message.MessageFlag;
import kd.bos.schedule.zk.ActiveKeyValueStore;
import kd.bos.schedule.zk.ListGroupForever;
import kd.bos.schedule.zk.ZkConfig;
import org.apache.zookeeper.Watcher;

/* loaded from: input_file:kd/bos/schedule/message/zk/ZkMessageWatcher.class */
public class ZkMessageWatcher extends AbstractMessageWatcher {
    private static Log log = LogFactory.getLog("kd.bos.schedule.message.zk.ZkMessageWatcher");
    protected static int EXPIRED_TIME = 7200000;
    protected ActiveKeyValueStore zkStore = null;
    private ListGroupForever jobMonitor = null;
    private String executorJobRootPath = null;
    private String runAt = null;

    @Override // kd.bos.schedule.message.AbstractMessageWatcher
    public void start() {
        this.zkStore = ActiveKeyValueStore.create();
        super.start();
        this.executorJobRootPath = ZkConfig.getJobRootPath() + "/" + this.runAt;
        this.zkStore.write(this.executorJobRootPath, String.valueOf(System.currentTimeMillis()));
        releaseDirtyJob();
        startMonitor();
    }

    private void releaseDirtyJob() {
        try {
            for (String str : this.zkStore.getChildren(this.executorJobRootPath)) {
                String str2 = this.executorJobRootPath + "/" + str;
                if (this.zkStore.exists(str2) != null) {
                    MessageInfo messageInfo = (MessageInfo) JSON.parseObject(this.zkStore.read(str2, null), MessageInfo.class);
                    if ((MessageFlag.HOLDING.equals(messageInfo.getFlag()) || MessageFlag.SCHEDULE.equals(messageInfo.getFlag())) && this.zkStore.getChildren(ZkConfig.getTaskStatusPath(str)).isEmpty()) {
                        this.zkStore.delete(ZkConfig.getTaskStatusPath(str));
                        this.zkStore.delete(str2);
                    }
                }
            }
        } catch (Exception e) {
            log.error(e);
        }
    }

    private void startMonitor() {
        this.jobMonitor = new ListGroupForever(this.executorJobRootPath, (list, eventType) -> {
            log.info("Event Type:" + eventType);
            if (eventType == Watcher.Event.EventType.NodeChildrenChanged) {
                processMsg(list);
            }
        });
        this.jobMonitor.start();
    }

    private void processMsg(List<String> list) {
        log.debug("get message");
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String str = this.executorJobRootPath + "/" + it.next();
            if (this.zkStore.exists(str) != null) {
                MessageInfo messageInfo = (MessageInfo) JSON.parseObject(this.zkStore.read(str, null), MessageInfo.class);
                if (messageInfo.getFlag() == null || messageInfo.getFlag().equals(MessageFlag.SCHEDULE)) {
                    messageInfo.setFlag(MessageFlag.HOLDING);
                    this.zkStore.write(str, JSON.toJSONString(messageInfo));
                    dispatch(messageInfo, null);
                }
            }
        }
    }

    @Override // kd.bos.schedule.message.AbstractMessageWatcher
    public void stop() {
        super.stop();
        this.jobMonitor.stop();
    }

    public void setRunAt(String str) {
        this.runAt = str;
    }

    public String getRunAt() {
        return this.runAt;
    }

    public void startToWatchMessage(MessageType messageType) {
    }
}
