package kd.bos.algox.flink.jobclient;

import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;
import kd.bos.algo.AlgoException;
import kd.bos.algox.AlgoXCallBack;
import kd.bos.algox.CommitTimeoutException;
import kd.bos.algox.JobNotFoundException;
import kd.bos.algox.JobProgressListener;
import kd.bos.algox.RunningTimeoutException;
import kd.bos.algox.cluster.ClusterClient;
import kd.bos.algox.cluster.ClusterFactory;
import kd.bos.algox.core.JobContext;
import kd.bos.algox.jobclient.JobClient;
import kd.bos.algox.jobclient.JobDetail;
import kd.bos.algox.jobclient.JobStatus;
import kd.bos.algox.util.ReflectCallUtil;
import kd.bos.db.splittingread.ThreadReadWriteContext;
import kd.bos.exception.KDException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.util.ConfigurationUtil;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.util.SerializedThrowable;

/* loaded from: input_file:kd/bos/algox/flink/jobclient/FlinkJobClient.class */
public class FlinkJobClient implements JobClient {
    private static int waitInterval = ConfigurationUtil.getInteger("algox.job.await.interval", 100).intValue();
    private static Log log = LogFactory.getLog(FlinkJobClient.class);

    private void setReadWriteModel(JobContext jobContext) {
        ThreadReadWriteContext threadReadWriteContext = ThreadReadWriteContext.get();
        if (threadReadWriteContext != null) {
            jobContext.setReadWriteMode(threadReadWriteContext.getMode().name());
        }
    }

    public void commit(JobContext jobContext, int i, int i2, TimeUnit timeUnit) throws CommitTimeoutException, RunningTimeoutException {
        try {
            setReadWriteModel(jobContext);
            Object submitJob = ClusterFactory.getFactory().getClusterClient().submitJob(jobContext, (int) timeUnit.toSeconds(i));
            long millis = timeUnit.toMillis(i2);
            String title = jobContext.getTitle();
            syncWaitJob(submitJob, title == null ? jobContext.getJobName() : title, jobContext.getRegion(), millis);
            jobContext.clear();
        } catch (Throwable th) {
            jobContext.clear();
            throw th;
        }
    }

    public String asyncCommit(JobContext jobContext, int i, int i2, TimeUnit timeUnit, AlgoXCallBack algoXCallBack) {
        try {
            setReadWriteModel(jobContext);
            Object submitJob = ClusterFactory.getFactory().getClusterClient().submitJob(jobContext, (int) timeUnit.toSeconds(i));
            long millis = timeUnit.toMillis(i2);
            String title = jobContext.getTitle();
            asyncWaitJob(submitJob, title == null ? jobContext.getJobName() : title, jobContext.getRegion(), millis, algoXCallBack);
            String valueOf = String.valueOf(submitJob);
            jobContext.clear();
            return valueOf;
        } catch (Throwable th) {
            jobContext.clear();
            throw th;
        }
    }

    public static void syncWaitJob(Object obj, String str, String str2, long j) {
        JobProgressListener.start(obj, str, str2);
        ClusterClient clusterClient = ClusterFactory.getFactory().getClusterClient();
        long currentTimeMillis = j + System.currentTimeMillis();
        do {
            try {
                JobDetail jobDetail = clusterClient.getJobDetail(obj, str2);
                if (jobDetail != null) {
                    JobStatus status = jobDetail.getStatus();
                    if (status == JobStatus.FINISHED) {
                        JobProgressListener.finished(obj, str, str2);
                        return;
                    } else if (status == JobStatus.FAILED) {
                        JobProgressListener.failed(obj, str, str2);
                        RuntimeException wrapException = wrapException(jobDetail.getException(), obj);
                        log.error("AlgoXError:", wrapException);
                        throw wrapException;
                    }
                }
            } catch (RuntimeException e) {
                JobProgressListener.failed(obj, str, str2);
                if (!isJobNotFoundException(e)) {
                    throw e;
                }
            }
            try {
                Thread.sleep(waitInterval);
            } catch (InterruptedException e2) {
            }
        } while (System.currentTimeMillis() < currentTimeMillis);
        JobProgressListener.failed(obj, str, str2);
        throw new RunningTimeoutException("Timeout when execute Job " + obj);
    }

    private static RuntimeException wrapException(Throwable th, Object obj) {
        Throwable th2;
        Throwable th3 = th;
        while (true) {
            th2 = th3;
            if (th2 == null) {
                return new AlgoException("Error when execute Job " + obj, th);
            }
            if ((th2 instanceof AlgoException) || (th2 instanceof KDException)) {
                break;
            }
            if (th2 instanceof SerializedThrowable) {
                th3 = ((SerializedThrowable) th2).deserializeError((ClassLoader) null);
            } else if (th2 instanceof InvocationTargetException) {
                th3 = ((InvocationTargetException) th2).getTargetException();
            } else {
                if (th2.getClass().getSimpleName().contains("Timeout")) {
                    return new RunningTimeoutException("Error when execute Job " + obj, th2);
                }
                if (th2 == th2.getCause()) {
                    return new AlgoException("Error when execute Job " + obj, th2);
                }
                th3 = th2.getCause();
            }
        }
        return (RuntimeException) th2;
    }

    private static boolean isJobNotFoundException(Throwable th) {
        while (!(th instanceof JobNotFoundException)) {
            th = th.getCause();
            if (th == null) {
                return false;
            }
        }
        return true;
    }

    public static void asyncWaitJob(final Object obj, final String str, final String str2, long j, final AlgoXCallBack algoXCallBack) {
        final long currentTimeMillis = j + System.currentTimeMillis();
        ReflectCallUtil.executeOnceIncludeRequestContext("AlgoX" + obj, new Runnable() { // from class: kd.bos.algox.flink.jobclient.FlinkJobClient.1
            @Override // java.lang.Runnable
            public void run() {
                JobProgressListener.start(obj, str, str2);
                ClusterClient clusterClient = ClusterFactory.getFactory().getClusterClient();
                do {
                    try {
                        JobDetail jobDetail = clusterClient.getJobDetail(obj, str2);
                        if (jobDetail != null) {
                            JobStatus status = jobDetail.getStatus();
                            if (status == JobStatus.FINISHED) {
                                JobProgressListener.finished(obj, str, str2);
                                if (algoXCallBack != null) {
                                    algoXCallBack.onFinished();
                                    return;
                                }
                                return;
                            }
                            if (status == JobStatus.FAILED) {
                                if (!(jobDetail.getException() instanceof AlgoException)) {
                                    throw new AlgoException("Error when execute Job " + obj, jobDetail.getException());
                                }
                                throw jobDetail.getException();
                            }
                        }
                        try {
                            Thread.sleep(FlinkJobClient.waitInterval);
                        } catch (InterruptedException e) {
                        }
                    } catch (Exception e2) {
                        JobProgressListener.failed(obj, str, str2);
                        if (algoXCallBack != null) {
                            algoXCallBack.onFailed(e2);
                            return;
                        }
                        return;
                    }
                } while (System.currentTimeMillis() < currentTimeMillis);
                JobProgressListener.failed(obj, str, str2);
                if (algoXCallBack != null) {
                    algoXCallBack.onFailed(new RunningTimeoutException("Timeout when execute Job " + obj));
                }
            }
        });
    }

    public int getJobProgress(String str) {
        JobDetailsInfo jobDetailsInfo = AlgoXJobDetailsInfoUtil.getJobDetailsInfo(str);
        if (jobDetailsInfo == null) {
            return 0;
        }
        return new BigDecimal(jobDetailsInfo.getJobVertexInfos().stream().filter(jobVertexDetailsInfo -> {
            return jobVertexDetailsInfo.getExecutionState() == ExecutionState.FINISHED;
        }).count()).divide(new BigDecimal(jobDetailsInfo.getJobVertexInfos().size()), 2, 4).multiply(new BigDecimal(100)).intValue();
    }

    public boolean checkJobExists(String str) {
        return AlgoXJobDetailsInfoUtil.checkJobExists(str);
    }
}
