package kd.isc.iscb.platform.core.dc.mq.rocketmq;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.util.StringUtils;
import kd.isc.iscb.platform.core.cache.data.PublishTopicSchema;
import kd.isc.iscb.platform.core.connector.ConnectorError;
import kd.isc.iscb.platform.core.connector.k3cloud.K3CloudConstant;
import kd.isc.iscb.platform.core.dc.mq.MessageQueueServer;
import kd.isc.iscb.platform.core.dc.mq.MessageReceiver;
import kd.isc.iscb.platform.core.dc.mq.PublishedMessage;
import kd.isc.iscb.platform.core.vc.MappingResultImportJob;
import kd.isc.iscb.util.data.ReadLockFreeMap;
import kd.isc.iscb.util.dt.D;
import kd.isc.iscb.util.misc.StringUtil;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

/* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/rocketmq/RocketMqServer.class */
public class RocketMqServer implements MessageQueueServer {
    private static final String BOOTSTRAP_SERVERS = "bootstrap_servers";
    private static final String NUMBER = "number";
    private static final String NAME = "name";
    private static final Log LOG = LogFactory.getLog(RocketMqServer.class);
    private final DynamicObject cfg;
    private final Map<String, DefaultMQProducer> producers = new ReadLockFreeMap();
    private final Map<String, DefaultMQPushConsumer> consumers = new HashMap();

    /* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/rocketmq/RocketMqServer$TopicTagData.class */
    public static class TopicTagData {
        private String topic;
        private String tag;
        private String group;

        public TopicTagData(String str, String str2) {
            this.tag = str2;
            int indexOf = str.indexOf("::");
            if (indexOf > 0) {
                this.group = str.substring(0, indexOf);
                str = str.substring(indexOf + 2);
            }
            this.topic = str;
            if (str.contains("@")) {
                String[] split = str.split("@", 2);
                this.topic = split[0];
                this.tag = split[1];
            }
        }

        public String getTopic() {
            return this.topic;
        }

        public String getTag() {
            return this.tag;
        }

        public String getGroup() {
            return this.group;
        }
    }

    public RocketMqServer(DynamicObject dynamicObject) {
        this.cfg = dynamicObject;
        checkRocketMq(dynamicObject);
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public DynamicObject getConfig() {
        return this.cfg;
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public long getId() {
        return D.l(this.cfg.getPkValue());
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void attachListener(String str, MessageReceiver messageReceiver) {
        if (this.consumers.containsKey(str)) {
            throw new UnsupportedOperationException(String.format(String.format(ResManager.loadKDString("RocketMQ主题（%s）已订阅！", "RocketMqServer_2", "isc-iscb-platform-core", new Object[0]), str), new Object[0]));
        }
        try {
            this.consumers.put(str, createConsumer(str, messageReceiver));
        } catch (Exception e) {
            throw ConnectorError.ROCKET_CONNECT_ERROR.create(e, new String[]{this.cfg.getString("name"), this.cfg.getString("number"), StringUtil.getMessage(e)});
        }
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void detachListeners() {
        Iterator<Map.Entry<String, DefaultMQPushConsumer>> it = this.consumers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, DefaultMQPushConsumer> next = it.next();
            it.remove();
            DefaultMQPushConsumer value = next.getValue();
            if (value != null) {
                try {
                    value.shutdown();
                } catch (Exception e) {
                    LOG.warn("failed to close consumer. consumer = " + value, e);
                }
            }
        }
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public void publish(String str, PublishedMessage publishedMessage) {
        TopicTagData topicTagData = new TopicTagData(str, null);
        String topic = topicTagData.getTopic();
        String tag = topicTagData.getTag();
        DefaultMQProducer producer = getProducer(topic, topicTagData.getGroup());
        Message message = new Message(topic, tag, publishedMessage.getData());
        message.setKeys(D.s(Long.valueOf(publishedMessage.getId())));
        int delayTime = getDelayTime(str);
        if (delayTime != -1) {
            message.setDelayTimeLevel(delayTime);
        }
        try {
            producer.send(message, new RocketSendCallback(publishedMessage));
        } catch (Exception e) {
            publishedMessage.setFailed(e);
            throw ConnectorError.MQ_PUBLISH_MESSAGE_FAILURE.wrap(e);
        }
    }

    private int getDelayTime(String str) {
        String string = PublishTopicSchema.get(str).getConfig().getString("delay_time");
        if (D.s(string) == null) {
            return -1;
        }
        return Integer.parseInt(string);
    }

    private DefaultMQPushConsumer createConsumer(String str, MessageReceiver messageReceiver) throws MQClientException {
        TopicTagData topicTagData = new TopicTagData(str, "*");
        String topic = topicTagData.getTopic();
        String tag = topicTagData.getTag();
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(topicTagData.getGroup(), createRpcHook(), new AllocateMessageQueueAveragely());
        defaultMQPushConsumer.setNamesrvAddr(D.s(this.cfg.get(BOOTSTRAP_SERVERS)).replace(" ", MappingResultImportJob.EMPTY_STR).replace(',', ';'));
        defaultMQPushConsumer.subscribe(topic, tag);
        defaultMQPushConsumer.setConsumeThreadMin(1);
        defaultMQPushConsumer.setConsumeThreadMax(20);
        defaultMQPushConsumer.setPullInterval(1000L);
        defaultMQPushConsumer.registerMessageListener(new RocketMessageListener(messageReceiver));
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
        defaultMQPushConsumer.setInstanceName(topic + System.currentTimeMillis());
        defaultMQPushConsumer.start();
        return defaultMQPushConsumer;
    }

    private RPCHook createRpcHook() {
        String s = D.s(this.cfg.get(K3CloudConstant.USER));
        String s2 = D.s(this.cfg.get(K3CloudConstant.PCODE));
        if (StringUtils.isNotEmpty(s) && StringUtils.isNotEmpty(s2)) {
            return new AclClientRPCHook(new SessionCredentials(s, s2));
        }
        return null;
    }

    private DefaultMQProducer getProducer(String str, String str2) {
        DefaultMQProducer defaultMQProducer = this.producers.get(str);
        return defaultMQProducer != null ? defaultMQProducer : createPublisher(str, str2);
    }

    private synchronized DefaultMQProducer createPublisher(String str, String str2) {
        DefaultMQProducer defaultMQProducer = this.producers.get(str);
        if (defaultMQProducer == null) {
            defaultMQProducer = new DefaultMQProducer(str, createRpcHook());
            if (!StringUtil.isEmpty(str2)) {
                defaultMQProducer.setProducerGroup(str2);
            }
            defaultMQProducer.setNamesrvAddr(D.s(this.cfg.get(BOOTSTRAP_SERVERS)).replace(" ", MappingResultImportJob.EMPTY_STR).replace(',', ';'));
            defaultMQProducer.setInstanceName(str + System.currentTimeMillis());
            defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
            try {
                defaultMQProducer.start();
                this.producers.put(str, defaultMQProducer);
            } catch (MQClientException e) {
                defaultMQProducer.shutdown();
                throw ConnectorError.ROCKET_CONNECT_ERROR.create(e, new String[]{this.cfg.getString("name"), this.cfg.getString("number"), e.getMessage()});
            }
        }
        return defaultMQProducer;
    }

    private void checkRocketMq(DynamicObject dynamicObject) {
        DefaultMQAdminExt defaultMQAdminExt = null;
        try {
            try {
                defaultMQAdminExt = new DefaultMQAdminExt(createRpcHook());
                defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
                defaultMQAdminExt.setNamesrvAddr(D.s(dynamicObject.get(BOOTSTRAP_SERVERS)).replace(" ", MappingResultImportJob.EMPTY_STR).replace(',', ';'));
                defaultMQAdminExt.start();
                defaultMQAdminExt.fetchAllTopicList();
                if (defaultMQAdminExt != null) {
                    defaultMQAdminExt.shutdown();
                }
            } catch (MQClientException | RemotingException | InterruptedException e) {
                throw ConnectorError.ROCKET_CONNECT_ERROR.create(e, new String[]{dynamicObject.getString("name"), dynamicObject.getString("number"), e.getMessage()});
            }
        } catch (Throwable th) {
            if (defaultMQAdminExt != null) {
                defaultMQAdminExt.shutdown();
            }
            throw th;
        }
    }
}
