package kd.bos.xdb.task.service.indexmove;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import kd.bos.bundle.BosRes;
import kd.bos.db.DB;
import kd.bos.db.sharding.ShardTaskRuntime;
import kd.bos.db.sharding.ShardingManager;
import kd.bos.xdb.XDBConfig;
import kd.bos.xdb.XDBManagerUtil;
import kd.bos.xdb.entity.ShardProgressEntity;
import kd.bos.xdb.entity.ShardTaskEntity;
import kd.bos.xdb.enums.ShardTaskNodeEnum;
import kd.bos.xdb.enums.ShardTaskStatusEnum;
import kd.bos.xdb.exception.ExceptionUtil;
import kd.bos.xdb.mq.ShardLogPublish;
import kd.bos.xdb.repository.ShardConfigRepository;
import kd.bos.xdb.repository.ShardProgressRepository;
import kd.bos.xdb.repository.ShardTaskRepository;
import kd.bos.xdb.service.ActionUtil;
import kd.bos.xdb.service.ShardTaskConfig;
import kd.bos.xdb.service.action.parallel.ShardThreadPool;
import kd.bos.xdb.service.calc.CountShardingTableCall;
import kd.bos.xdb.service.calc.CountTableCall;
import kd.bos.xdb.sharding.config.IndexDefine;
import kd.bos.xdb.tablemanager.TableManager;
import kd.bos.xdb.tablemanager.TableName;
import kd.bos.xdb.task.config.Configuration;
import kd.bos.xdb.task.progress.ProgressUtil;
import kd.bos.xdb.task.progress.SubProgress;
import kd.bos.xdb.task.service.ShardingTaskServiceAbst;
import kd.bos.xdb.task.service.indexmove.work.IndexMoveWork;
import kd.bos.xdb.task.service.indexmove.work.IndexMoveWorkRunner;
import kd.bos.xdb.util.Threads;

/* loaded from: input_file:kd/bos/xdb/task/service/indexmove/ShardingIndexMoveService.class */
public class ShardingIndexMoveService extends ShardingTaskServiceAbst {
    public ShardingIndexMoveService(ShardTaskEntity shardTaskEntity, Configuration configuration) {
        super(shardTaskEntity, configuration, ShardTaskNodeEnum.INDEXMOVE);
    }

    @Override // kd.bos.xdb.task.service.ShardingTaskServiceAbst
    public boolean doSharding() throws Exception {
        Set<IndexDefine> fastIndexDefineSet = this.configuration.getFastIndexDefineSet();
        if (!XDBManagerUtil.validateFastIndexConfigurable(this.configuration.getFastIndex(), this.configuration.getLastFastIndex())) {
            this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_1", "快速索引已设置。", new Object[0]));
            this.mainProgress.store(true);
            return false;
        }
        TableManager tableManager = XDBConfig.getTableManager();
        boolean isIndexPK = this.configuration.getMainShardingConfig().isIndexPK();
        TableName of = TableName.of(this.configuration.getMainTable());
        if (isIndexPK && !tableManager.existTable(of.getPKTempTable())) {
            tableManager.createPKTempTable(of.getPKTempTable(), XDBManagerUtil.getPkTypeEnum(this.taskEntity.getEntitynumber()), this.configuration.getMainShardingConfig().getOptions().getDataRowsRange(), (IndexDefine[]) fastIndexDefineSet.toArray(new IndexDefine[fastIndexDefineSet.size()]));
        }
        if (this.mainProgress.isCurStepExecuted()) {
            this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_1", "继续索引数据迁移", new Object[0]));
            this.mainProgress.setTotalRecord(this.taskEntity.getTotalRecord());
            this.mainProgress.setMovedRecord(this.taskEntity.getMovingRecord());
            this.mainProgress.store(true);
        } else {
            AtomicLong atomicLong = new AtomicLong(0L);
            String originalName = TableName.of(this.configuration.getMainShardingConfig().getTable()).getOriginalName();
            List asList = Arrays.asList(tableManager.getShardingTable(originalName));
            ArrayList arrayList = new ArrayList(asList.size());
            if (asList.size() == 0) {
                arrayList.add(calcCountPools.submit(Threads.wrapCallable(new CountShardingTableCall(this.configuration.getRoute(), TableName.of(originalName).getPrototypeTable(), originalName))));
            } else {
                Iterator it = asList.iterator();
                while (it.hasNext()) {
                    arrayList.add(calcCountPools.submit(Threads.wrapCallable(new CountShardingTableCall(this.configuration.getRoute(), (String) it.next(), originalName))));
                }
            }
            ArrayList<CountTableCall> arrayList2 = new ArrayList(arrayList.size());
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                arrayList2.add(((Future) it2.next()).get());
            }
            for (CountTableCall countTableCall : arrayList2) {
                String shardTable = countTableCall.getShardTable();
                TableName of2 = TableName.of(shardTable);
                long count = countTableCall.getCount();
                atomicLong.addAndGet(count);
                long shardingIndex = of2.isPrototypeTable() ? -1L : of2.getShardingIndex();
                String movingTable = of2.getMovingTable(shardingIndex);
                if (!tableManager.existTable(movingTable)) {
                    tableManager.createMovingTable(movingTable, shardingIndex, ActionUtil.getPkTypeEnum(this.taskEntity.getEntitynumber()));
                    ProgressUtil.insertProgressTable(this.taskEntity, shardTable, shardingIndex, count);
                }
            }
            this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_0", "开始索引数据迁移,首次执行...", new Object[0]));
            this.mainProgress.setMovedRecord(0L);
            this.mainProgress.setSourceTableCount(asList.size());
            this.mainProgress.setCurStepExecuted(true);
            ProgressUtil.storeTotalRecord(this.taskEntity, atomicLong.get(), this.mainProgress);
        }
        List<ShardProgressEntity> loadUnexecutedProgressList = ShardProgressRepository.get().loadUnexecutedProgressList(this.taskEntity.getId(), null);
        boolean z = false;
        if (loadUnexecutedProgressList.isEmpty()) {
            if (ShardProgressRepository.get().countProgressUnclosed(this.taskEntity.getId()) > 0) {
                throw ExceptionUtil.wrap(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_2", "存在未完成的并行索引迁移任务", new Object[0]));
            }
            XDBManagerUtil.logInfo(MessageFormat.format("ShardTaskMovingHandler indexMove doIndexMoving end,entitynumber:{0}, taskId:{1}, progressCount:{2}", this.taskEntity.getEntitynumber(), Long.valueOf(this.taskEntity.getId()), Integer.valueOf(loadUnexecutedProgressList.size())));
        } else if (!ShardTaskConfig.isEnableMovingParallel()) {
            Iterator<ShardProgressEntity> it3 = loadUnexecutedProgressList.iterator();
            while (true) {
                if (!it3.hasNext()) {
                    break;
                }
                ShardProgressEntity next = it3.next();
                if (this.shardTaskRuntime.isTaskPaused(this.configuration.getMainTable())) {
                    z = true;
                    break;
                }
                SubProgress of3 = SubProgress.of(next.getId(), this.taskEntity.getEntitynumber(), next.getProgresssign());
                of3.setParentSp(this.mainProgress);
                of3.setMovingTable(next.getShardTable());
                z = new IndexMoveWork(next, this.taskEntity, this.configuration, of3).doWork();
                if (z) {
                    break;
                }
                this.mainProgress.setExecSql(null);
                this.mainProgress.store(false);
            }
        } else {
            ShardThreadPool shardThreadPool = new ShardThreadPool();
            Throwable th = null;
            try {
                try {
                    shardThreadPool.setTable(this.taskEntity.getEntitynumber());
                    shardThreadPool.setName("XDB-IndexMoveParallelThread-");
                    shardThreadPool.start();
                    ArrayList<Future> arrayList3 = new ArrayList(loadUnexecutedProgressList.size());
                    for (ShardProgressEntity shardProgressEntity : loadUnexecutedProgressList) {
                        SubProgress of4 = SubProgress.of(shardProgressEntity.getId(), this.taskEntity.getEntitynumber(), shardProgressEntity.getProgresssign());
                        of4.setParentSp(this.mainProgress);
                        of4.setMovingTable(shardProgressEntity.getShardTable());
                        arrayList3.add(shardThreadPool.submit(Threads.wrapCallable(new IndexMoveWorkRunner(new IndexMoveWork(shardProgressEntity, this.taskEntity, this.configuration, of4), this.configuration.getRoute(), shardProgressEntity))));
                    }
                    boolean z2 = false;
                    String str = "";
                    for (Future future : arrayList3) {
                        if (z2) {
                            try {
                                future.cancel(true);
                            } catch (Throwable th2) {
                                StringWriter stringWriter = new StringWriter();
                                th2.printStackTrace(new PrintWriter(stringWriter));
                                String format = MessageFormat.format("ShardTaskMovingHandler ShardingDataMoveService future.get error,entitynumber:{0}, taskId:{1}, isException:{2}, errorinfo:{3}", this.taskEntity.getEntitynumber(), Long.valueOf(this.taskEntity.getId()), Boolean.valueOf(z2), stringWriter.toString());
                                XDBManagerUtil.logError(format, th2);
                                ShardTaskRuntime.get().setTaskInterruptedCurrentNode(true, this.configuration.getMainTable());
                                ShardLogPublish.get().publishOperationLog(this.taskEntity.getId(), this.taskEntity.getEntitynumber(), format, getProgressType());
                                if (!z2) {
                                    str = stringWriter.toString();
                                    z2 = true;
                                }
                            }
                        } else if (z) {
                            future.get();
                        } else {
                            z = ((Boolean) future.get()).booleanValue();
                        }
                    }
                    if (z2) {
                        shardThreadPool.shutdown();
                        while (!shardThreadPool.isTerminated()) {
                            Thread.sleep(200L);
                        }
                        throw ExceptionUtil.wrap(str);
                    }
                    if (shardThreadPool != null) {
                        if (0 != 0) {
                            try {
                                shardThreadPool.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            shardThreadPool.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th4) {
                if (shardThreadPool != null) {
                    if (th != null) {
                        try {
                            shardThreadPool.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        shardThreadPool.close();
                    }
                }
                throw th4;
            }
        }
        long countProgressUnclosed = ShardProgressRepository.get().countProgressUnclosed(this.taskEntity.getId());
        if (z) {
            if (countProgressUnclosed != 0) {
                ShardTaskRepository.get().setTaskSuspended(this.taskEntity.getId());
                XDBManagerUtil.logInfo(MessageFormat.format("ArchiveTaskHandler ShardingIndexMoveService doSharding paused,entitynumber:{0}, taskId:{1}", this.taskEntity.getEntitynumber(), Long.valueOf(this.taskEntity.getId())));
                ShardLogPublish.get().publishOperationLog(this.taskEntity.getId(), this.taskEntity.getEntitynumber(), BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_3", "任务暂停", new Object[0]), getProgressType());
                return true;
            }
            ShardTaskRepository.get().setNextTaskstatus(this.taskEntity.getId(), ShardTaskStatusEnum.PAUSE, ShardTaskStatusEnum.EXECUTING);
            ShardingManager.get().notifyLimitTaskPaused(false, this.configuration.getMainTable());
        } else if (ShardTaskRuntime.get().isTaskPaused(this.configuration.getMainTable())) {
            ShardTaskRepository.get().setNextTaskstatus(this.taskEntity.getId(), ShardTaskStatusEnum.PAUSE, ShardTaskStatusEnum.EXECUTING);
            ShardingManager.get().notifyLimitTaskPaused(false, this.configuration.getMainTable());
        }
        if (countProgressUnclosed > 0) {
            throw new RuntimeException(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_4", "存在未完成的并行迁移任务", new Object[0]));
        }
        this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_5", "删除中间表", new Object[0]));
        this.mainProgress.store(true);
        List asList2 = Arrays.asList(tableManager.getShardingTable(TableName.of(this.configuration.getMainShardingConfig().getTable()).getOriginalName()));
        if (asList2.isEmpty()) {
            String movingTable2 = of.getMovingTable(-1L);
            ActionUtil.dropTable(this.configuration.getRoute(), movingTable2);
            tableManager.removeCahce(movingTable2);
        } else {
            Iterator it4 = asList2.iterator();
            while (it4.hasNext()) {
                TableName of5 = TableName.of((String) it4.next());
                String movingTable3 = of5.getMovingTable(of5.getShardingIndex());
                ActionUtil.dropTable(this.configuration.getRoute(), movingTable3);
                tableManager.removeCahce(movingTable3);
            }
        }
        if (!this.mainProgress.isRenamePkTempTable()) {
            boolean countPKData = countPKData(of.getPKTempTable());
            boolean countPKData2 = countPKData(of.getPKTable());
            if (countPKData || !countPKData2) {
                ActionUtil.dropTable(this.configuration.getRoute(), of.getPKTable());
                ActionUtil.renameTable(this.configuration.getRoute(), of.getPKTable(), of.getPKTempTable());
                this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_7", "重命名临时表", new Object[0]));
            } else {
                ActionUtil.dropTable(this.configuration.getRoute(), of.getPKTempTable());
                this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_6", "移除PK临时表", new Object[0]));
            }
            this.mainProgress.setRenamePkTempTable();
            this.mainProgress.store(true);
        }
        ShardConfigRepository.get().setConfigParameter(this.configuration.getConfigEntity().getId(), rebuildShardingStrategtParameter(this.configuration.getConfigEntity().getStrategyparams(), this.configuration.getFastIndex()));
        this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardingIndexMoveService_8", "索引数据迁移完成", new Object[0]));
        this.mainProgress.store(true);
        return false;
    }

    private boolean countPKData(String str) {
        if (DB.exitsTable(this.configuration.getRoute(), str)) {
            return ((Boolean) DB.query(this.configuration.getRoute(), "select top 1 fpk from " + str, resultSet -> {
                return resultSet.next();
            })).booleanValue();
        }
        return false;
    }

    private String rebuildShardingStrategtParameter(String str, String str2) {
        Properties properties = new Properties();
        try {
            properties.load(new StringReader(str));
            String str3 = (properties.getProperty("range") != null ? "range=" + properties.getProperty("range") + '\n' : "") + "indices=" + str2 + '\n';
            String property = properties.getProperty("cust_class");
            if (property != null) {
                str3 = str3 + "cust_class=" + property + '\n' + str.substring(str.indexOf(property) + property.length() + 1);
            }
            String str4 = str3 + (properties.getProperty("mod") != null ? "mod=" + properties.getProperty("mod") + '\n' : "") + (properties.getProperty("delim") != null ? "delim=" + properties.getProperty("delim") + '\n' : "");
            String property2 = properties.getProperty("pattern");
            if (property2 != null) {
                str4 = str4 + "pattern=" + properties.getProperty("pattern") + '\n';
                int indexOf = str.indexOf(property2) + property2.length() + 1;
                if (indexOf != str.length()) {
                    str4 = str4 + str.substring(indexOf);
                }
            }
            return str4;
        } catch (IOException e) {
            throw ExceptionUtil.wrap(e);
        }
    }
}
