package kd.ssc.task.partask;

import java.util.ArrayList;
import kd.bos.cache.CacheFactory;
import kd.bos.cache.DistributeSessionlessCache;
import kd.bos.exception.KDException;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageAcker;
import kd.bos.mq.MessageConsumer;
import kd.bos.util.StringUtils;
import kd.ssc.task.partask.service.impl.ParTaskNodeServiceImpl;
import kd.ssc.task.partask.util.CreateNewPartask;

/* loaded from: input_file:kd/ssc/task/partask/ParTaskConsumer.class */
public class ParTaskConsumer implements MessageConsumer {
    private static final int RP_TIMES = 5;
    private static final Log log = LogFactory.getLog(ParTaskConsumer.class);
    private static final DistributeSessionlessCache cache = CacheFactory.getCommonCacheFactory().getDistributeSessionlessCache("parTask-consumer");

    public void onMessage(Object obj, String str, boolean z, MessageAcker messageAcker) throws KDException {
        log.info("成功进入并行任务消费者实现类:" + getClass().getName() + "，mq传入参数: " + obj + ", " + str + ", is resend: " + z);
        if (obj == null || StringUtils.isEmpty(obj.toString())) {
            log.error("消息队列中 message 为空（或为null）,不用消费");
            messageAcker.ack(str);
            return;
        }
        if (!(obj instanceof ParTaskMessage)) {
            log.error("消息队列中 message 类型不匹配 " + ParTaskMessage.class.getName());
            messageAcker.ack(str);
            return;
        }
        ParTaskMessage parTaskMessage = (ParTaskMessage) obj;
        String str2 = parTaskMessage.getBillId() + "-" + parTaskMessage.getNextNodeTypeId() + "-" + parTaskMessage.getInstanceId();
        try {
            if (!"1".equals(cache.eval("if redis.call('SETNX',KEYS[1],'1') == 0 then return 0 else return redis.call('EXPIRE',KEYS[1],60) end", str2, new ArrayList()).toString())) {
                log.error("并行任务重复消费，key = " + str2);
                messageAcker.ack(str);
                return;
            }
            try {
                long longValue = CreateNewPartask.createNewTask(parTaskMessage).longValue();
                log.info("创建并行任务end，任务id:" + longValue);
                if (longValue == 0) {
                    log.error("并行任务创建失败,[实例id:节点id]:[" + parTaskMessage.getInstanceId() + ":" + parTaskMessage.getNextNodeDefId() + "]");
                    new ParTaskNodeServiceImpl().updateTableAfterCreateParTaskFailed(parTaskMessage.getInstanceId(), parTaskMessage.getNextNodeDefId());
                }
                messageAcker.ack(str);
                log.info("完成并行任务消费");
            } catch (KDException e) {
                if ("-2".equals(e.getErrorCode().getCode())) {
                    messageAcker.ack(str);
                    log.info("完成并行任务消费");
                } else {
                    log.error("[实例id:节点id]:[" + parTaskMessage.getInstanceId() + ":" + parTaskMessage.getNextNodeDefId() + "]的并行任务创建过程中产生异常：" + e.getMessage(), e);
                    new ParTaskNodeServiceImpl().updateTableAfterCreateParTaskFailed(parTaskMessage.getInstanceId(), parTaskMessage.getNextNodeDefId());
                    throw e;
                }
            }
        } catch (Throwable th) {
            messageAcker.ack(str);
            log.info("完成并行任务消费");
            throw th;
        }
    }
}
