package kd.macc.faf.datasync.exec;

import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kd.bos.algo.DataType;
import kd.bos.algo.Field;
import kd.bos.algo.Input;
import kd.bos.algox.DataSetX;
import kd.bos.algox.JobSession;
import kd.bos.algox.JoinDataSetX;
import kd.macc.faf.algox.FAFAlgoX;
import kd.macc.faf.algox.FAFAlgoXConstants;
import kd.macc.faf.algox.FAFJoinLinkFunction;
import kd.macc.faf.algox.FAFJoinLinkInfo;
import kd.macc.faf.algox.FAFSQLBuilder;
import kd.macc.faf.algox.XDbOutput;
import kd.macc.faf.datasync.exec.data.DataSyncModel;
import kd.macc.faf.datasync.exec.data.DataSyncSummary;
import kd.macc.faf.datasync.exec.func.FillDimensionHashMapFunction;
import kd.macc.faf.datasync.exec.input.sum.MergeInput;
import kd.macc.faf.datasync.func.SyncDataToSumGroupReduceFunction;
import kd.macc.faf.summary.FAFSummaryCalculateDTO;
import kd.macc.faf.summary.FAFSummaryCalculateGroupReduceFunction;
import kd.macc.faf.summary.FAFSummaryDataDTO;

/* loaded from: input_file:kd/macc/faf/datasync/exec/AlgoxSummaryExec.class */
public class AlgoxSummaryExec implements Exec<JobSession, Boolean> {
    private static final int batchSize = 1000;
    private final DataSyncSummary syncSummary;
    private final List<MergeInput> mergeInputs;
    private int pos = 0;

    public AlgoxSummaryExec(DataSyncSummary dataSyncSummary) {
        this.syncSummary = dataSyncSummary;
        this.mergeInputs = dataSyncSummary.getMergeInputs();
    }

    private static String[] replaceGrouperFields(String[] strArr, FAFJoinLinkInfo fAFJoinLinkInfo) {
        return (String[]) Arrays.stream(strArr).flatMap(str -> {
            return str.equals(fAFJoinLinkInfo.getLeftField()) ? Stream.of((Object[]) fAFJoinLinkInfo.getReplaceGroupField()) : Stream.of(str);
        }).toArray(i -> {
            return new String[i];
        });
    }

    @Override // kd.macc.faf.datasync.exec.Exec
    public boolean hasNext() {
        return this.mergeInputs != null && this.pos < this.mergeInputs.size();
    }

    @Override // kd.macc.faf.datasync.exec.Exec
    public Boolean exec(JobSession jobSession) {
        if (!hasNext()) {
            return Boolean.FALSE;
        }
        List list = (List) this.mergeInputs.stream().skip(this.pos).limit(1000L).collect(Collectors.toList());
        this.pos += batchSize;
        if (list.isEmpty()) {
            return null;
        }
        Iterator it = list.iterator();
        while (it.hasNext()) {
            execSummary(jobSession, (MergeInput) it.next());
        }
        return Boolean.TRUE;
    }

    private void execSummary(JobSession jobSession, MergeInput mergeInput) {
        FAFSummaryDataDTO fAFSummaryDataDTO = this.syncSummary.getFAFSummaryDataDTO();
        DataSyncModel model = this.syncSummary.getModel();
        String[] strArr = (String[]) model.getAllDimensionFields().toArray(new String[0]);
        DataSetX fromInput = jobSession.fromInput(mergeInput.getDetailInput());
        if (model.haveHashDim()) {
            fromInput = fromInput.map(new FillDimensionHashMapFunction(fromInput.getRowMeta(), model.getHashDimensionList()));
        }
        DataSetX reduceGroup = fromInput.groupBy(strArr).reduceGroup(new SyncDataToSumGroupReduceFunction(fromInput.getRowMeta(), fAFSummaryDataDTO));
        DataSetX filter = reduceGroup.filter("collectstatus = 1");
        if (fAFSummaryDataDTO.needCaculate()) {
            Input[] sumInput = mergeInput.getSumInput();
            if (sumInput != null && sumInput.length > 0) {
                filter = filter.union(jobSession.fromInput(sumInput).addFields(new Field[]{new Field("summaryid", DataType.LongType)}, new Object[]{0L}));
            }
            FAFSummaryCalculateDTO buildCalculateDTO = fAFSummaryDataDTO.buildCalculateDTO();
            FAFJoinLinkInfo link = buildCalculateDTO.getLink();
            JoinDataSetX withFunc = filter.leftJoin(jobSession.fromInput(FAFAlgoX.createOrmInput(link))).on(link.getLeftField(), link.getRightField()).withFunc(new FAFJoinLinkFunction(filter.getRowMeta(), link));
            filter = withFunc.groupBy(replaceGrouperFields(strArr, link)).reduceGroup(new FAFSummaryCalculateGroupReduceFunction(filter.getRowMeta(), withFunc.getRowMeta(), buildCalculateDTO));
        }
        DataSetX filter2 = reduceGroup.filter("collectstatus = 0");
        FAFSQLBuilder updateDetailTableSQL = model.getUpdateDetailTableSQL();
        XDbOutput xDbOutput = new XDbOutput("fias", updateDetailTableSQL.toUpdateSQL(), updateDetailTableSQL.getRowMeta());
        if (model.haveHashDim()) {
            filter2.select(new String[]{"summaryid", "dimhash", FAFAlgoXConstants.ID}).output(xDbOutput);
        } else {
            filter2.select(new String[]{"summaryid", FAFAlgoXConstants.ID}).output(xDbOutput);
        }
        DataSetX removeFields = filter.filter("summaryid = 1").removeFields(new String[]{"summaryid", "operationstatus", "collectstatus"});
        if (model.isHaveTimeField()) {
            removeFields = removeFields.addFields(new Field[]{new Field("fcreatetime", DataType.TimestampType)}, new Object[]{new Date()});
        }
        removeFields.output(new XDbOutput("fias", model.getInsertSumTableSQL(), removeFields.getRowMeta()));
    }
}
