package kd.isc.iscb.platform.core.dc.mq.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.isc.iscb.platform.core.dc.mq.MessageQueueServer;
import kd.isc.iscb.platform.core.dc.mq.MessageReceiver;
import kd.isc.iscb.platform.core.dc.mq.PublishedMessage;
import kd.isc.iscb.util.data.ReadLockFreeMap;
import kd.isc.iscb.util.dt.D;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/kafka/KafkaServer.class */
public class KafkaServer implements MessageQueueServer {
    private static Log logger = LogFactory.getLog(KafkaServer.class);
    private Map<String, Publisher> publishers = new ReadLockFreeMap();
    private Map<String, KafkaConsumerTask> consumers = new HashMap();
    private AdminClient adminClient;
    private DynamicObject cfg;
    private String bootstrapServers;
    private int numPartitions;
    private int numReplication;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/kafka/KafkaServer$Publisher.class */
    public static class Publisher {
        private KafkaProducer<String, byte[]> producer;
        private String topic;
        private String bootstrapServers;
        private DynamicObject cfg;
        private String customParam;

        private Publisher(String str, String str2, DynamicObject dynamicObject, String str3) {
            this.topic = str;
            this.bootstrapServers = str2;
            this.cfg = dynamicObject;
            this.customParam = str3;
            this.producer = createProducer();
        }

        private KafkaProducer<String, byte[]> createProducer() {
            return innerCreateProducer();
        }

        private KafkaProducer<String, byte[]> innerCreateProducer() {
            HashMap hashMap = new HashMap(4);
            hashMap.put("bootstrap.servers", this.bootstrapServers);
            hashMap.put("acks", "all");
            hashMap.put("linger.ms", "1000");
            hashMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            hashMap.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaUtil.checkSaslJaas(hashMap, this.cfg);
            KafkaUtil.setCustomParam(hashMap, this.customParam);
            return new KafkaProducer<>(hashMap);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void send(PublishedMessage publishedMessage) {
            if (this.producer == null) {
                this.producer = createProducer();
                KafkaServer.logger.info("Kafka主题（" + this.topic + "）生产者创建成功");
            }
            publish(publishedMessage);
        }

        private void publish(PublishedMessage publishedMessage) {
            this.producer.send(new ProducerRecord(this.topic, publishedMessage.getData()), new KafkaCallback(publishedMessage));
            this.producer.flush();
            KafkaServer.logger.info("Kafka主题（" + this.topic + "）发送消息成功");
        }
    }

    public KafkaServer(AdminClient adminClient, DynamicObject dynamicObject, String str, int i) {
        this.numPartitions = 1;
        this.numReplication = 1;
        this.adminClient = adminClient;
        this.cfg = dynamicObject;
        this.bootstrapServers = str;
        if (i > 1) {
            this.numPartitions = i;
            this.numReplication = i;
        }
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public DynamicObject getConfig() {
        return this.cfg;
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public long getId() {
        return D.l(this.cfg.getPkValue());
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void attachListener(String str, MessageReceiver messageReceiver) {
        if (this.consumers.containsKey(str)) {
            throw new UnsupportedOperationException(String.format(ResManager.loadKDString("Kafka主题（%s）已订阅！", "KafkaServer_2", "isc-iscb-platform-core", new Object[0]), str));
        }
        KafkaConsumerTask kafkaConsumerTask = new KafkaConsumerTask(this.bootstrapServers, str, str, messageReceiver, this.cfg);
        KafkaThreadPool.execute(kafkaConsumerTask);
        this.consumers.put(str, kafkaConsumerTask);
        logger.info("Kafka主题（" + str + "）消费者创建成功");
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void attachListener(String str, MessageReceiver messageReceiver, String str2) {
        if (this.consumers.containsKey(str)) {
            throw new UnsupportedOperationException(String.format(ResManager.loadKDString("Kafka主题（%s）已订阅！", "KafkaServer_2", "isc-iscb-platform-core", new Object[0]), str));
        }
        KafkaConsumerTask kafkaConsumerTask = new KafkaConsumerTask(this.bootstrapServers, str, str, messageReceiver, this.cfg, str2);
        KafkaThreadPool.execute(kafkaConsumerTask);
        this.consumers.put(str, kafkaConsumerTask);
        logger.info("Kafka主题（" + str + "）消费者创建成功");
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public synchronized void detachListeners() {
        Iterator<Map.Entry<String, KafkaConsumerTask>> it = this.consumers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, KafkaConsumerTask> next = it.next();
            it.remove();
            KafkaConsumerTask value = next.getValue();
            try {
                value.pause();
            } catch (Exception e) {
                logger.warn("failed to close KafkaConsumerTask. KafkaConsumerTask = " + value, e);
            }
        }
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public void publish(String str, PublishedMessage publishedMessage) {
        getProducer(str, null).send(publishedMessage);
    }

    @Override // kd.isc.iscb.platform.core.dc.mq.MessageQueueServer
    public void publish(String str, PublishedMessage publishedMessage, String str2) {
        getProducer(str, str2).send(publishedMessage);
    }

    private Publisher getProducer(String str, String str2) {
        Publisher publisher = this.publishers.get(str);
        return publisher != null ? publisher : createPublisher(str, str2);
    }

    private synchronized Publisher createPublisher(String str, String str2) {
        Publisher publisher = this.publishers.get(str);
        if (publisher == null) {
            publisher = new Publisher(str, this.bootstrapServers, this.cfg, str2);
            this.publishers.put(str, publisher);
        }
        return publisher;
    }
}
