package kd.isc.iscb.platform.core.dc.mq.kafka;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import kd.bos.dataentity.entity.DynamicObject;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.isc.iscb.platform.core.dc.mq.MessageReceiver;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:kd/isc/iscb/platform/core/dc/mq/kafka/KafkaConsumerTask.class */
public class KafkaConsumerTask implements Runnable {
    private static Log logger = LogFactory.getLog(KafkaConsumerTask.class);
    private final MessageReceiver receiver;
    private final String bootstrapServers;
    private final String groupId;
    private final String topic;
    private final Map<String, Object> configs;
    private final DynamicObject cfg;
    private volatile boolean pauseFlag;

    public KafkaConsumerTask(String str, String str2, String str3, MessageReceiver messageReceiver, DynamicObject dynamicObject) {
        this.pauseFlag = false;
        this.configs = new HashMap();
        this.bootstrapServers = str;
        this.groupId = str2;
        this.topic = str3;
        this.receiver = messageReceiver;
        this.cfg = dynamicObject;
        initConsumerConfig();
    }

    public KafkaConsumerTask(String str, String str2, String str3, MessageReceiver messageReceiver, DynamicObject dynamicObject, String str4) {
        this(str, str2, str3, messageReceiver, dynamicObject);
        KafkaUtil.setCustomParam(this.configs, str4);
    }

    private void initConsumerConfig() {
        this.configs.put("bootstrap.servers", this.bootstrapServers);
        this.configs.put("group.id", this.groupId);
        this.configs.put("enable.auto.commit", Boolean.FALSE);
        this.configs.put("auto.commit.interval.ms", "1000");
        this.configs.put("max.poll.records", "10");
        this.configs.put("session.timeout.ms", "10000");
        this.configs.put("heartbeat.interval.ms", "3000");
        this.configs.put("max.poll.interval.ms", "300000");
        this.configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.configs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaUtil.checkSaslJaas(this.configs, this.cfg);
    }

    @Override // java.lang.Runnable
    public void run() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.configs);
        kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        while (!this.pauseFlag) {
            Iterator it = kafkaConsumer.poll(Duration.ofMillis(1000L)).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                try {
                    logger.info("receive success." + consumerRecord);
                    this.receiver.handleMessage((byte[]) consumerRecord.value());
                } catch (Exception e) {
                    logger.warn("failed to handle message." + consumerRecord, e);
                }
            }
            try {
                kafkaConsumer.commitSync();
            } catch (Exception e2) {
                logger.warn("failed to commitSync offset.", e2);
            }
        }
        try {
            kafkaConsumer.close();
        } catch (Exception e3) {
            logger.warn("failed to close consumer." + toString(), e3);
        }
    }

    public void pause() {
        this.pauseFlag = true;
    }

    public String toString() {
        return "KafkaConsumerTask{bootstrapServers='" + this.bootstrapServers + "', groupId='" + this.groupId + "', topic='" + this.topic + "'}";
    }
}
