package kd.fi.iep.task.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kd.bos.algo.DataSet;
import kd.bos.algo.FilterFunction;
import kd.bos.algo.Row;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.OperateOption;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.entity.operate.result.OperationResult;
import kd.bos.ext.fi.fa.business.util.BillUtil;
import kd.bos.orm.query.QFilter;
import kd.bos.servicehelper.QueryServiceHelper;
import kd.bos.servicehelper.operation.OperationServiceHelper;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;
import kd.fi.iep.dao.FormDesignDao;
import kd.fi.iep.enums.ExecuteStatus;
import kd.fi.iep.enums.ExecuteType;
import kd.fi.iep.info.IepResManage;
import kd.fi.iep.info.IntellExceOperInfo;
import kd.fi.iep.info.IntellSchemeExecInfo;
import kd.fi.iep.task.AbstractExecute;
import kd.fi.iep.task.IntellExecuteContext;
import kd.fi.iep.util.IntellExecuteUtil;

/* loaded from: input_file:kd/fi/iep/task/impl/GeneralExecute.class */
public class GeneralExecute extends AbstractExecute {
    private static final int THREAD_TOTAL = 20;
    private static final int THREAD_DATA_LIMIT = 1000;
    private static final int THREAD_DATA_LIMIT_TOTAL = 10000;
    private final Object syncObj;
    private int processThreadCount;
    private static final ThreadPool threadPoolIntellPlan = ThreadPools.newFixedThreadPool("fi/iep/intellAccountingExecPlan", 5);
    private AtomicBoolean isExistFailBill;

    public GeneralExecute(IntellExecuteContext intellExecuteContext) {
        super(intellExecuteContext);
        this.syncObj = new Object();
        this.processThreadCount = 0;
        this.isExistFailBill = new AtomicBoolean(false);
    }

    /* JADX WARN: Finally extract failed */
    @Override // kd.fi.iep.task.AbstractExecute
    public void doExecute() {
        try {
            IntellExceOperInfo exceOperInfo = this.ctx.getSchemeExecInfo().getExceOperInfo();
            Long schemaId = exceOperInfo.getSchemaId();
            Date execstartdate = this.ctx.getExecstartdate();
            long sumLogId = this.ctx.getSumLogId();
            logger.info(schemaId + "开始获取目标数据...");
            List<QFilter> exceDataCollectionFilter = getExceDataCollectionFilter();
            logger.info("过滤条件:" + exceDataCollectionFilter);
            String[] groupbys = getGroupbys();
            DataSet queryDataSet = QueryServiceHelper.queryDataSet("fi.iep.IntellAccountingExecPlan.getExceDataCollection", exceOperInfo.getBussiness(), (String) Stream.concat(Stream.of("id"), Arrays.stream(groupbys)).distinct().collect(Collectors.joining(BillUtil.COMMA)), (QFilter[]) exceDataCollectionFilter.toArray(new QFilter[0]), "id");
            if (ExecuteType.AUTO == exceOperInfo.getExecuteType()) {
                DataSet removeFailBillId = AbstractExecute.removeFailBillId(exceOperInfo, this.isExistFailBill);
                final HashSet hashSet = new HashSet(16);
                removeFailBillId.forEach(row -> {
                    hashSet.add(row.getLong("srcbillid"));
                });
                logger.info(" remove fail billid size {}", Integer.valueOf(hashSet.size()));
                queryDataSet = queryDataSet.filter(new FilterFunction() { // from class: kd.fi.iep.task.impl.GeneralExecute.1
                    private static final long serialVersionUID = -1593549109335713680L;

                    public boolean test(Row row2) {
                        return !hashSet.contains(row2.getLong("id"));
                    }
                });
            }
            ArrayList<List<Long>> arrayList = new ArrayList(16);
            for (DataSet dataSet : queryDataSet.splitByGroup(groupbys)) {
                ArrayList arrayList2 = new ArrayList();
                dataSet.forEach(row2 -> {
                    arrayList2.add(row2.getLong("id"));
                });
                int size = arrayList2.size();
                int ceil = (int) Math.ceil((size * 1.0d) / 1000.0d);
                for (int i = 0; i < ceil; i++) {
                    arrayList.add(THREAD_DATA_LIMIT * (i + 1) < size ? arrayList2.subList(THREAD_DATA_LIMIT * i, THREAD_DATA_LIMIT * (i + 1)) : arrayList2.subList(THREAD_DATA_LIMIT * i, size));
                }
            }
            try {
                int sum = arrayList.stream().mapToInt((v0) -> {
                    return v0.size();
                }).sum();
                IntellSchemeExecInfo schemeExecInfo = this.ctx.getSchemeExecInfo();
                schemeExecInfo.appendRecordTC(sum);
                try {
                    logger.info(schemaId + "目标数据" + sum);
                    if (sum == 0) {
                        logger.info("智能核算主线程退出--批处理数据:0");
                        if (this.isExistFailBill.get()) {
                            schemeExecInfo.setExecDetails(IepResManage.FAIL_DATA_FOUND);
                        } else {
                            schemeExecInfo.setExecDetails(String.format(ResManager.loadKDString("未获取到符合该方案【%1$s%2$s】操作的前置条件的数据。", "VoucherBatchBuildExecService_1", "bos-ext-fi", new Object[0]), FormDesignDao.getFormName(exceOperInfo.getBussiness()), exceOperInfo.getOperName()));
                        }
                        queryDataSet.close();
                        return;
                    }
                    ArrayList arrayList3 = new ArrayList();
                    synchronized (this.syncObj) {
                        for (List<Long> list : arrayList) {
                            if (this.processThreadCount >= THREAD_TOTAL) {
                                logger.info("智能核算主线程等待");
                                this.syncObj.wait();
                            }
                            logger.info("1000一批智能核算主线程开始分发子线程");
                            threadCountAdd();
                            arrayList3.add(mutiThreadExecOperation(schemaId, sum, execstartdate, exceOperInfo, this.ctx.getSumLogId(), this, list, getBatchSize(exceOperInfo), this.syncObj, Long.valueOf(sumLogId)));
                            if (IntellExecuteUtil.isStopExcute(this.ctx.getSchemeExecInfo())) {
                                break;
                            }
                        }
                    }
                    HashSet hashSet2 = new HashSet(3);
                    Iterator it = arrayList3.iterator();
                    while (it.hasNext()) {
                        hashSet2.add(((Future) it.next()).get());
                    }
                    ExecuteStatus executeStatus = ExecuteStatus.FINISH;
                    if (hashSet2.contains(ExecuteStatus.STOP)) {
                        executeStatus = ExecuteStatus.STOP;
                    } else if (hashSet2.contains(ExecuteStatus.FAIL)) {
                        executeStatus = ExecuteStatus.FAIL;
                    }
                    this.ctx.getSchemeExecInfo().getExceOperInfo().setExecuteStatus(executeStatus);
                    logger.info("智能核算主线程运行完成");
                    queryDataSet.close();
                } catch (Exception e) {
                    throw new Exception(e);
                }
            } catch (Throwable th) {
                queryDataSet.close();
                throw th;
            }
        } catch (Exception e2) {
            logger.error(e2.getMessage(), e2);
        }
    }

    private Set<Long> batchRemoveFailBillId(IntellExceOperInfo intellExceOperInfo, DataSet dataSet) {
        HashSet hashSet = new HashSet(16);
        HashSet hashSet2 = new HashSet(16);
        while (dataSet.hasNext()) {
            Long l = dataSet.next().getLong("id");
            hashSet.add(l);
            if (ExecuteType.AUTO == this.ctx.getType()) {
                hashSet2.add(l);
                if (hashSet2.size() % 2000 == 0 || !dataSet.hasNext()) {
                    boolean removeFailBillId = removeFailBillId(hashSet, hashSet2, intellExceOperInfo.getBussiness(), intellExceOperInfo);
                    if (!this.isExistFailBill.get()) {
                        this.isExistFailBill.compareAndSet(false, removeFailBillId);
                    }
                    logger.info(" remove fail billid size {}", Integer.valueOf(hashSet2.size()));
                    hashSet2.clear();
                    if (IntellExecuteUtil.isStopExcute(this.ctx.getSchemeExecInfo())) {
                        break;
                    }
                }
            }
            if (!dataSet.hasNext()) {
                break;
            }
        }
        return hashSet;
    }

    @Override // kd.fi.iep.task.AbstractExecute
    protected OperationResult invokeOperation(Object[] objArr, OperateOption operateOption) {
        IntellExceOperInfo exceOperInfo = this.ctx.getSchemeExecInfo().getExceOperInfo();
        return OperationServiceHelper.executeOperate(exceOperInfo.getOper(), exceOperInfo.getBussiness(), objArr, operateOption);
    }

    private Future<ExecuteStatus> mutiThreadExecOperation(Long l, int i, Date date, IntellExceOperInfo intellExceOperInfo, long j, GeneralExecute generalExecute, List<Long> list, int i2, Object obj, Long l2) {
        return threadPoolIntellPlan.submit(new Callable(l, i, date, intellExceOperInfo, j, list, i2, generalExecute, obj, l2) { // from class: kd.fi.iep.task.impl.GeneralExecute.1OperationRunnable
            private Long intelschemaId;
            private int count;
            private Date execstartdate;
            private IntellExceOperInfo exceOperInfo;
            private long operSumLogId;
            private List<Long> newList;
            private int pointsDataLimit;
            private GeneralExecute intellAccountingExecPlan;
            private final Object syncObj;
            private Long sumLogId;

            {
                this.intelschemaId = l;
                this.count = i;
                this.execstartdate = date;
                this.exceOperInfo = intellExceOperInfo;
                this.operSumLogId = j;
                this.newList = list;
                this.pointsDataLimit = i2;
                this.intellAccountingExecPlan = generalExecute;
                this.syncObj = obj;
                this.sumLogId = l2;
            }

            @Override // java.util.concurrent.Callable
            public ExecuteStatus call() {
                ExecuteStatus executeStatus = ExecuteStatus.FINISH;
                try {
                    try {
                        RequestContext.get().setTraceId(GeneralExecute.this.ctx.getTraceId());
                        GeneralExecute.logger.info("智能核算进入单个线程调度:{};traceId:{}", this.intelschemaId, GeneralExecute.this.ctx.getTraceId());
                        long currentTimeMillis = System.currentTimeMillis();
                        executeStatus = GeneralExecute.this.exeOperation(this.newList, this.intelschemaId.longValue(), this.count, this.execstartdate, this.exceOperInfo, this.pointsDataLimit);
                        GeneralExecute.logger.info("智能核算进行的线程数:{}", Integer.valueOf(this.intellAccountingExecPlan.processThreadCount));
                        GeneralExecute.logger.info("智能核算完成单个线程调度: {};耗时:{}ms", this.intelschemaId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        this.intellAccountingExecPlan.threadCountSub();
                        if (this.intellAccountingExecPlan.processThreadCount == 10) {
                            synchronized (this.syncObj) {
                                GeneralExecute.logger.info("智能核算主线程唤醒");
                                this.syncObj.notifyAll();
                            }
                        }
                    } catch (Exception e) {
                        GeneralExecute.logger.error("智能核算单个线程发生报错:" + e);
                        this.intellAccountingExecPlan.threadCountSub();
                        if (this.intellAccountingExecPlan.processThreadCount == 10) {
                            synchronized (this.syncObj) {
                                GeneralExecute.logger.info("智能核算主线程唤醒");
                                this.syncObj.notifyAll();
                            }
                        }
                    }
                    return executeStatus;
                } catch (Throwable th) {
                    this.intellAccountingExecPlan.threadCountSub();
                    if (this.intellAccountingExecPlan.processThreadCount == 10) {
                        synchronized (this.syncObj) {
                            GeneralExecute.logger.info("智能核算主线程唤醒");
                            this.syncObj.notifyAll();
                        }
                    }
                    throw th;
                }
            }
        });
    }

    public synchronized void threadCountAdd() {
        this.processThreadCount++;
    }

    public synchronized void threadCountSub() {
        this.processThreadCount--;
    }

    protected ExecuteStatus exeOperation(List<Long> list, long j, int i, Date date, IntellExceOperInfo intellExceOperInfo, int i2) {
        ExecuteStatus executeStatus = ExecuteStatus.FINISH;
        StringBuffer stringBuffer = new StringBuffer();
        List<Long> list2 = null;
        HashSet hashSet = new HashSet(3);
        for (int i3 = 0; i3 < list.size(); i3++) {
            list2 = (List) Optional.ofNullable(list2).orElseGet(ArrayList::new);
            list2.add(list.get(i3));
            if (list2.size() == i2 || i3 == list.size() - 1) {
                executeStatus = startExecOperation(j, i, date, intellExceOperInfo, stringBuffer, list2);
                list2 = null;
                hashSet.add(executeStatus);
            }
        }
        if (hashSet.contains(ExecuteStatus.STOP)) {
            executeStatus = ExecuteStatus.STOP;
        } else if (hashSet.contains(ExecuteStatus.FAIL)) {
            executeStatus = ExecuteStatus.FAIL;
        }
        return executeStatus;
    }

    private int getBatchSize(IntellExceOperInfo intellExceOperInfo) {
        if (intellExceOperInfo.isSingle()) {
            return 1;
        }
        return intellExceOperInfo.getEachbatchsize();
    }
}
