package kd.bos.algox.flink.cluster.masterworker;

import kd.bos.algox.AlgoXException;
import kd.bos.cache.CacheFactory;
import kd.bos.cache.DistributeSessionlessCache;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.service.register.ServiceRegister;
import kd.bos.util.ConfigurationUtil;
import kd.bos.zk.ZKFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:kd/bos/algox/flink/cluster/masterworker/MasterStarter.class */
public class MasterStarter extends StandaloneSessionClusterEntrypoint {
    private static Log log = LogFactory.getLog(MasterStarter.class);
    private static boolean started;

    public MasterStarter(Configuration configuration) {
        super(configuration);
    }

    public void onFatalError(Throwable th) {
        log.error("Fatal error occurred in the flink master.", th);
        log.error("Master start error", th);
        MasterRuntimeContext.setException(new AlgoXException("Master starting error.", th));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: createDispatcherResourceManagerComponentFactory, reason: merged with bridge method [inline-methods] */
    public DefaultDispatcherResourceManagerComponentFactory m24createDispatcherResourceManagerComponentFactory(Configuration configuration) {
        return super.createDispatcherResourceManagerComponentFactory(configuration);
    }

    private static boolean testMasterExists(String str) {
        try {
            return MasterWorkerClusterClient.getMasterRpcService(str).isAvailable();
        } catch (Throwable th) {
            return false;
        }
    }

    private static void clearZk(Configuration configuration, String str) {
        if (!ConfigurationUtil.getBoolean("algox.cluster.clearZk", true).booleanValue() || testMasterExists(str)) {
            return;
        }
        String string = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
        String string2 = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT);
        try {
            String str2 = string2 + "/" + string;
            CuratorFramework zKClient = ZKFactory.getZKClient(configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM));
            if (((Stat) zKClient.checkExists().forPath(str2)) != null) {
                zKClient.delete().deletingChildrenIfNeeded().forPath(str2);
            }
        } catch (Throwable th) {
            log.warn(th);
        }
    }

    public static void startup() {
        if (!Boolean.getBoolean("algox.master.enable")) {
            log.warn("Algox master is disabled.");
            return;
        }
        if (started) {
            return;
        }
        started = true;
        DistributeSessionlessCache distributeSessionlessCache = CacheFactory.getCommonCacheFactory().getDistributeSessionlessCache();
        String str = "algox-starting-" + ConfigurationUtil.getString("algox.cluster.region", "default");
        String str2 = (String) distributeSessionlessCache.get(str);
        while (str2 != null) {
            try {
                Thread.sleep(5000L);
                str2 = (String) distributeSessionlessCache.get(str);
            } catch (InterruptedException e) {
            }
        }
        distributeSessionlessCache.put(str, "" + System.currentTimeMillis(), 60);
        try {
            doStart();
            distributeSessionlessCache.remove(str);
        } catch (Throwable th) {
            distributeSessionlessCache.remove(str);
            throw th;
        }
    }

    private static void doStart() {
        try {
            Configuration loadConfiguration = MasterConfig.loadConfiguration();
            String string = ConfigurationUtil.getString("algox.cluster.region", "default");
            clearZk(loadConfiguration, string);
            EnvironmentInformation.logEnvironmentInfo(LOG, MasterStarter.class.getSimpleName(), new String[0]);
            SignalHandler.register(LOG);
            JvmShutdownSafeguard.installAsShutdownHook(LOG);
            MasterStarter masterStarter = new MasterStarter(loadConfiguration);
            MasterRuntimeContext.setConfiguration(loadConfiguration);
            masterStarter.startCluster();
            ServiceRegister.registerService("MasterRpcService", getRegisterConfig(string));
        } catch (Error e) {
            log.error("Master start error", e);
            MasterRuntimeContext.setException(new AlgoXException("Master starting error.", e));
        } catch (Exception e2) {
            log.error("Master start error", e2);
            MasterRuntimeContext.setException(e2);
        }
    }

    private static String getRegisterConfig(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("interface=kd.bos.algox.flink.rpc.MasterRpcService\n").append("class=kd.bos.algox.flink.rpc.MasterRpcServiceImpl");
        if (StringUtils.isNotEmpty(str)) {
            sb.append('\n').append("appIds=").append(str);
        }
        return sb.toString();
    }
}
