package com.kingdee.bos.qing.dpp.engine.flink.job;

import com.kingdee.bos.qing.dpp.common.interfaces.TransformNameCreator;
import com.kingdee.bos.qing.dpp.common.options.QDppOptions;
import com.kingdee.bos.qing.dpp.common.types.DataSinkType;
import com.kingdee.bos.qing.dpp.common.types.TransformType;
import com.kingdee.bos.qing.dpp.engine.flink.job.execution.ExecuteMode;
import com.kingdee.bos.qing.dpp.engine.flink.job.execution.JobParallelismAdvisor;
import com.kingdee.bos.qing.dpp.engine.flink.job.result.IJobSubmitErrorHandler;
import com.kingdee.bos.qing.dpp.engine.flink.resource.DppEngineResourceImpl;
import com.kingdee.bos.qing.dpp.engine.flink.transform.model.TransformVertex;
import com.kingdee.bos.qing.dpp.engine.flink.util.GraphUtils;
import com.kingdee.bos.qing.dpp.exception.QDataTransformException;
import com.kingdee.bos.qing.dpp.job.model.QDppJobExecuteModel;
import com.kingdee.bos.qing.dpp.job.model.QDppJobResult;
import com.kingdee.bos.qing.dpp.job.prehandle.TransformModelPreHandleHelper;
import com.kingdee.bos.qing.dpp.model.transform.TransformModel;
import com.kingdee.bos.qing.dpp.model.transform.Transformation;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.graph.DirectedAcyclicGraph;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/job/QDppJobContext.class */
public class QDppJobContext implements TransformNameCreator {
    private StreamExecutionEnvironment streamEnv;
    private StreamTableEnvironmentImpl tableEnv;
    private DirectedAcyclicGraph<TransformVertex, DefaultEdge> directedAcyclicGraph;
    private TransformModel processModel;
    private QDppJobExecuteModel jobExecuteModel;
    private ExecuteMode executeMode;
    private boolean forQueryMeta;
    private int newCreateTransformCounter = 0;
    private int rowLimit = -1;
    private List<TransformVertex> lastVertexList = new ArrayList(1);
    private int buildOrderIndex = 0;
    private Map<String, TransformVertex> transformVertexMap = new HashMap(3);
    private List<IJobSubmitErrorHandler> errorHandlers = new ArrayList(2);

    public void initialize(QDppJobExecuteModel qDppJobExecuteModel, boolean z) throws QDataTransformException {
        this.streamEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 6123, new String[0]);
        setRowLimit(qDppJobExecuteModel.getDataLimit());
        boolean isNeedExecuteInStreamMode = qDppJobExecuteModel.isNeedExecuteInStreamMode();
        this.streamEnv.setRuntimeMode(isNeedExecuteInStreamMode ? RuntimeExecutionMode.STREAMING : RuntimeExecutionMode.BATCH);
        Configuration initGlobalJobParams = initGlobalJobParams(qDppJobExecuteModel);
        initGlobalJobParams.setBoolean(QDppOptions.JOB_GLOBAL_PARAMS_USE_STREAM_MODE.key(), isNeedExecuteInStreamMode);
        this.streamEnv.getConfig().setGlobalJobParameters(initGlobalJobParams);
        if (!z) {
            setParallelism(qDppJobExecuteModel);
        }
        this.forQueryMeta = z;
        if (isNeedExecuteInStreamMode) {
            this.streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, Time.seconds(10L)));
        } else {
            if (!canBeRestartOnErr(qDppJobExecuteModel.getTransformModel()) || ((Boolean) QDppOptions.ENGINE_LOCAL_EMBED_ENABLE.getValue()).booleanValue()) {
                this.streamEnv.setRestartStrategy(RestartStrategies.noRestart());
            } else {
                this.streamEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10L)));
            }
            this.streamEnv.getCheckpointConfig().disableCheckpointing();
        }
        this.streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        this.tableEnv = StreamTableEnvironment.create(this.streamEnv, isNeedExecuteInStreamMode ? EnvironmentSettings.newInstance().inStreamingMode().build() : EnvironmentSettings.newInstance().inBatchMode().build());
        Configuration configuration = this.tableEnv.getConfig().getConfiguration();
        configuration.setString("table.exec.sink.not-null-enforcer", "drop");
        configuration.setBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofMillis(1000L));
        configuration.setLong(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 1000L);
        this.jobExecuteModel = qDppJobExecuteModel;
        this.processModel = TransformModelPreHandleHelper.preHandleModel(z, qDppJobExecuteModel.getTransformModel(), this);
        qDppJobExecuteModel.setTransformModel(this.processModel);
        this.directedAcyclicGraph = GraphUtils.convertFrom(this.processModel);
    }

    private boolean canBeRestartOnErr(TransformModel transformModel) {
        for (Transformation transformation : transformModel.getAllTransforms().values()) {
            if (transformation.getTransformSettings().getType() == TransformType.SINK_DATA && transformation.getTransformSettings().getDataSinkType() == DataSinkType.JDBC) {
                return false;
            }
        }
        return true;
    }

    public ExecuteMode getExecuteMode() {
        return this.executeMode;
    }

    public boolean isForQueryMeta() {
        return this.forQueryMeta;
    }

    public void setExecuteMode(ExecuteMode executeMode) {
        this.executeMode = executeMode;
    }

    public void addGlobalParams(String str, String str2) {
        this.streamEnv.getConfig().getGlobalJobParameters().setString(str, str2);
    }

    private void setParallelism(QDppJobExecuteModel qDppJobExecuteModel) {
        if (qDppJobExecuteModel.getForcedJobParallelism() > 0) {
            this.streamEnv.setParallelism(qDppJobExecuteModel.getForcedJobParallelism());
            return;
        }
        Boolean bool = (Boolean) qDppJobExecuteModel.getGlobalParamValue(QDppOptions.JOB_GLOBAL_PARAMS_ADAPTIVE_PARALLELISM_ENABLE);
        if (null != bool && bool.booleanValue()) {
            this.streamEnv.setParallelism(-1);
        } else {
            this.streamEnv.setParallelism(JobParallelismAdvisor.calcJobParallelism(qDppJobExecuteModel.getTransformModel(), new DppEngineResourceImpl().getResourceView()));
        }
    }

    private Configuration initGlobalJobParams(QDppJobExecuteModel qDppJobExecuteModel) {
        Configuration configuration = new Configuration();
        configuration.setString(QDppOptions.JOB_GLOBAL_PARAMS_JOB_NAME.key(), qDppJobExecuteModel.getJobName());
        configuration.setString(QDppOptions.JOB_GLOBAL_PARAMS_FROM_HOST.key(), qDppJobExecuteModel.getFromServerAddress());
        configuration.setInteger(QDppOptions.JOB_GLOBAL_PARAMS_PORT.key(), qDppJobExecuteModel.getPort());
        Map globalJobParams = qDppJobExecuteModel.getGlobalJobParams();
        if (!globalJobParams.isEmpty()) {
            for (Map.Entry entry : globalJobParams.entrySet()) {
                configuration.set(ConfigOptions.key((String) entry.getKey()).defaultValue(entry.getValue()), entry.getValue());
            }
        }
        return configuration;
    }

    public int getNextBuildIndex() {
        this.buildOrderIndex++;
        return this.buildOrderIndex;
    }

    public boolean isExecuteInStreamMode() {
        return ((RuntimeExecutionMode) this.streamEnv.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)) == RuntimeExecutionMode.STREAMING;
    }

    public void addJobErrorHandler(IJobSubmitErrorHandler iJobSubmitErrorHandler) {
        if (null != iJobSubmitErrorHandler) {
            this.errorHandlers.add(iJobSubmitErrorHandler);
        }
    }

    public void handleError(QDppJobResult qDppJobResult) {
        this.errorHandlers.forEach(iJobSubmitErrorHandler -> {
            iJobSubmitErrorHandler.handleError(this, qDppJobResult);
        });
    }

    public void cacheVertex(TransformVertex transformVertex) {
        this.transformVertexMap.put(transformVertex.getTransName(), transformVertex);
    }

    public TransformVertex getVertex(String str) {
        return this.transformVertexMap.get(str);
    }

    public Collection<TransformVertex> getAllVertexes() {
        return this.transformVertexMap.values();
    }

    public QDppJobExecuteModel getJobExecuteModel() {
        return this.jobExecuteModel;
    }

    public void addLastVertex(TransformVertex transformVertex) {
        this.lastVertexList.add(transformVertex);
    }

    public List<TransformVertex> getLastVertexList() {
        return Collections.unmodifiableList(this.lastVertexList);
    }

    public String newUniqueTransName() {
        this.newCreateTransformCounter++;
        String str = "n" + this.newCreateTransformCounter;
        while (true) {
            String str2 = str;
            if (this.jobExecuteModel.getTransformModel().getTransform(str2) == null) {
                return str2;
            }
            this.newCreateTransformCounter++;
            str = "n" + this.newCreateTransformCounter;
        }
    }

    public int getRowLimit() {
        return this.rowLimit;
    }

    public void setRowLimit(int i) {
        this.rowLimit = i;
    }

    public StreamExecutionEnvironment getStreamEnv() {
        return this.streamEnv;
    }

    public StreamTableEnvironmentImpl getTableEnv() {
        return this.tableEnv;
    }

    public DirectedAcyclicGraph<TransformVertex, DefaultEdge> getDirectedAcyclicGraph() {
        return this.directedAcyclicGraph;
    }

    public void setDirectedAcyclicGraph(DirectedAcyclicGraph<TransformVertex, DefaultEdge> directedAcyclicGraph) {
        this.directedAcyclicGraph = directedAcyclicGraph;
    }
}
