package com.kingdee.bos.qing.dpp.engine.flink.transform;

import com.kingdee.bos.qing.dpp.common.annotations.Transformer;
import com.kingdee.bos.qing.dpp.common.log.DppLogger;
import com.kingdee.bos.qing.dpp.common.types.DppDataType;
import com.kingdee.bos.qing.dpp.common.types.TransformType;
import com.kingdee.bos.qing.dpp.datasource.input.AbstractSourceDataInput;
import com.kingdee.bos.qing.dpp.datasource.input.QueryOption;
import com.kingdee.bos.qing.dpp.datasource.input.SourceInputFactory;
import com.kingdee.bos.qing.dpp.datasource.remote.RemoteSourceInputProxy;
import com.kingdee.bos.qing.dpp.engine.flink.transform.model.TransformVertex;
import com.kingdee.bos.qing.dpp.engine.flink.transform.source.QDppSourceFunction;
import com.kingdee.bos.qing.dpp.engine.flink.util.FlinkDataTypeUtils;
import com.kingdee.bos.qing.dpp.engine.optimization.util.FilterUtil;
import com.kingdee.bos.qing.dpp.exception.QDppSourceException;
import com.kingdee.bos.qing.dpp.exception.TableBuildException;
import com.kingdee.bos.qing.dpp.job.model.QDppJobExecuteModel;
import com.kingdee.bos.qing.dpp.model.filters.DppRuntimeFilter;
import com.kingdee.bos.qing.dpp.model.filters.IRuntimeFilter;
import com.kingdee.bos.qing.dpp.model.schema.DppField;
import com.kingdee.bos.qing.dpp.model.schema.SourceInputSchema;
import com.kingdee.bos.qing.dpp.model.transform.Transformation;
import com.kingdee.bos.qing.dpp.model.transform.settings.InputSourceSettings;
import com.kingdee.bos.qing.dpp.model.transform.source.AbstractDppSource;
import com.kingdee.bos.qing.util.CollectionUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.slf4j.LoggerFactory;

@Transformer(TransformType.SOURCE_INPUT)
/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/transform/SourceTransformer.class */
public class SourceTransformer extends BaseTransformer {
    private static final DppLogger log = new DppLogger("Qing-Dpp:", LoggerFactory.getLogger(SourceTransformer.class));

    @Override // com.kingdee.bos.qing.dpp.engine.flink.transform.BaseTransformer
    protected void internalBuild(List<TransformVertex> list, TransformVertex transformVertex) throws TableBuildException {
        QDppJobExecuteModel jobExecuteModel = this.jobContext.getJobExecuteModel();
        String fromServerAddress = jobExecuteModel.getFromServerAddress();
        int port = jobExecuteModel.getPort();
        Transformation transformation = transformVertex.getTransformation();
        InputSourceSettings transformSettings = transformation.getTransformSettings();
        AbstractDppSource buildSource = transformSettings.buildSource(transformation.getName());
        QueryOption queryOption = new QueryOption();
        queryOption.setRemoteServerIp(fromServerAddress);
        queryOption.setRemoteServerPort(port);
        queryOption.setQueryTimeout(jobExecuteModel.getQueryTimeout());
        queryOption.setDataLimit(this.jobContext.getRowLimit());
        queryOption.setJobName(jobExecuteModel.getJobName());
        IRuntimeFilter pushDownFilter = transformSettings.getPushDownFilter();
        if (pushDownFilter != null) {
            if (isFieldNull(pushDownFilter)) {
                List<DppField> fields = transformVertex.getFields();
                HashMap hashMap = new HashMap(16);
                for (DppField dppField : fields) {
                    hashMap.put(dppField.getFullFieldName(), dppField);
                }
                FilterUtil.setFilterField(pushDownFilter, hashMap);
            }
            queryOption.setFilter(pushDownFilter);
        }
        String[] selectedFields = transformSettings.getSelectedFields();
        HashSet hashSet = null;
        if (selectedFields != null && selectedFields.length >= 1) {
            boolean z = false;
            hashSet = new HashSet(selectedFields.length);
            for (String str : selectedFields) {
                if (null == transformVertex.getOriginalField(str)) {
                    z = true;
                } else {
                    hashSet.add(str);
                }
            }
            if (z) {
                log.info("some selected fields not exist, these fields would not build for pushdown");
            }
            for (DppField dppField2 : transformVertex.getFields()) {
                if (hashSet.contains(dppField2.getFullFieldName())) {
                    queryOption.addField(dppField2.getOriginalName());
                }
            }
        }
        try {
            RowTypeInfo sourceRowTypeInfo = getSourceRowTypeInfo(transformVertex.getFields(), transformVertex.getTransName());
            QDppSourceFunction qDppSourceFunction = new QDppSourceFunction(sourceRowTypeInfo, buildSource, queryOption);
            this.jobContext.getStreamEnv().clean(qDppSourceFunction);
            Table fromDataStream = this.jobContext.getTableEnv().fromDataStream(new DataStreamSource(this.jobContext.getStreamEnv(), sourceRowTypeInfo, new StreamSource(qDppSourceFunction), false, transformation.getName(), this.jobContext.isExecuteInStreamMode() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED), buildTableSchema(transformVertex, hashSet));
            IRuntimeFilter unPushDownFilter = transformSettings.getUnPushDownFilter();
            if (unPushDownFilter != null) {
                fromDataStream = fromDataStream.filter(FilterTransformer.buildExpression(unPushDownFilter, transformVertex));
            }
            transformVertex.setTable(fromDataStream);
        } catch (QDppSourceException e) {
            throw new TableBuildException(e);
        }
    }

    private boolean isFieldNull(IRuntimeFilter iRuntimeFilter) {
        List<DppRuntimeFilter> extractField = FilterUtil.extractField(iRuntimeFilter);
        if (CollectionUtils.isEmpty(extractField)) {
            return false;
        }
        Iterator<DppRuntimeFilter> it = extractField.iterator();
        while (it.hasNext()) {
            if (it.next().getField() == null) {
                return true;
            }
        }
        return false;
    }

    private Schema buildTableSchema(TransformVertex transformVertex, Set<String> set) throws TableBuildException {
        List<DppField> fields = transformVertex.getFields();
        Schema.Builder newBuilder = Schema.newBuilder();
        for (DppField dppField : fields) {
            String fullFieldName = dppField.getFullFieldName();
            if (set == null || set.contains(fullFieldName)) {
                newBuilder.column(dppField.getTableUniqueFieldName(), FlinkDataTypeUtils.getTargetInputDataType(dppField));
            }
        }
        return newBuilder.build();
    }

    private RowTypeInfo getSourceRowTypeInfo(List<DppField> list, String str) throws QDppSourceException {
        TypeInformation[] typeInformationArr = new TypeInformation[list.size()];
        String[] strArr = new String[list.size()];
        for (int i = 0; i < list.size(); i++) {
            DppField dppField = list.get(i);
            DppDataType originalDppDataType = dppField.getOriginalDppDataType();
            String tableUniqueFieldName = dppField.getTableUniqueFieldName();
            TypeInformation typeInformation = FlinkDataTypeUtils.getTypeInformation(originalDppDataType, dppField);
            if (null == typeInformation) {
                throw new QDppSourceException("unsupported field type:" + originalDppDataType + ",fieldName:" + tableUniqueFieldName + ", from source:" + str);
            }
            typeInformationArr[i] = typeInformation;
            strArr[i] = tableUniqueFieldName;
        }
        return new RowTypeInfo(typeInformationArr, strArr);
    }

    /* JADX WARN: Finally extract failed */
    @Override // com.kingdee.bos.qing.dpp.engine.flink.transform.BaseTransformer
    protected void internalInit(List<TransformVertex> list, TransformVertex transformVertex) throws TableBuildException {
        String fromServerAddress = this.jobContext.getJobExecuteModel().getFromServerAddress();
        int port = this.jobContext.getJobExecuteModel().getPort();
        Transformation transformation = transformVertex.getTransformation();
        InputSourceSettings inputSourceSettings = (InputSourceSettings) transformation.getTransformSettings();
        AbstractDppSource buildSource = inputSourceSettings.buildSource(transformation.getName());
        QueryOption queryOption = new QueryOption();
        queryOption.setRemoteServerIp(fromServerAddress);
        queryOption.setRemoteServerPort(port);
        queryOption.setOnlyQueryMeta(true);
        queryOption.setDataLimit(this.jobContext.getRowLimit());
        queryOption.setJobName(this.jobContext.getJobExecuteModel().getJobName());
        AbstractSourceDataInput abstractSourceDataInput = null;
        try {
            try {
                abstractSourceDataInput = buildSource.isExecuteInEngine() ? SourceInputFactory.newLocalSourceInput(buildSource.getConnectType()) : new RemoteSourceInputProxy(this.jobContext.getJobExecuteModel().getRpcVersion());
                abstractSourceDataInput.open(buildSource, queryOption);
                SourceInputSchema inputSchema = abstractSourceDataInput.getInputSchema();
                if (null != abstractSourceDataInput) {
                    abstractSourceDataInput.close();
                }
                Map<String, Boolean> fieldNameMap = getFieldNameMap(inputSourceSettings);
                List fields = inputSchema.getFields();
                for (int i = 0; i < fields.size(); i++) {
                    DppField dppField = (DppField) fields.get(i);
                    String fullFieldName = dppField.getFullFieldName();
                    if (fieldNameMap == null || fieldNameMap.get(fullFieldName) != null) {
                        transformVertex.addTableFieldRelation(fullFieldName, dppField);
                    }
                }
            } catch (QDppSourceException e) {
                throw new TableBuildException("can not open source input", e);
            }
        } catch (Throwable th) {
            if (null != abstractSourceDataInput) {
                abstractSourceDataInput.close();
            }
            throw th;
        }
    }

    private Map<String, Boolean> getFieldNameMap(InputSourceSettings inputSourceSettings) {
        String[] selectedFields = inputSourceSettings.getSelectedFields();
        if (selectedFields == null || selectedFields.length == 0) {
            return null;
        }
        HashMap hashMap = new HashMap(selectedFields.length);
        for (String str : selectedFields) {
            hashMap.put(str, true);
        }
        return hashMap;
    }
}
