package kd.bos.mq.kafka;

import java.util.Date;
import java.util.concurrent.locks.ReentrantLock;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.kafka.KafkamqFactory;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageConsumer;
import kd.bos.mq.config.QueueDef;
import kd.bos.mq.rabbit.ExceptionLogger;
import kd.bos.mq.support.Consumer;
import kd.bos.mq.support.ConsumerManager;
import kd.bos.mq.support.QueueManager;
import kd.bos.util.ExceptionUtils;
import kd.bos.util.StringUtils;

/* loaded from: input_file:kd/bos/mq/kafka/KafkaConsumer.class */
public class KafkaConsumer implements Consumer {
    private static final Log LOGGER = LogFactory.getLog(KafkaConsumer.class);
    private String queue;
    private boolean autoAck;
    private int concurrency;
    private MessageConsumer mc;
    private String region;
    private org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> consumer;
    private volatile boolean isStartedFlag;
    private Date startAt;
    private String topic;
    private String appId;

    public KafkaConsumer(String str, String str2, boolean z, int i, MessageConsumer messageConsumer, int i2) {
        this.region = str;
        this.queue = str2;
        this.concurrency = i;
        this.mc = messageConsumer;
        this.autoAck = z;
        ConsumerManager.getConsumerounter().inc();
    }

    @Override // kd.bos.mq.support.Consumer
    public String getRegion() {
        return this.region;
    }

    @Override // kd.bos.mq.support.Consumer
    public void setConcurrency(int i) {
        this.concurrency = i;
    }

    @Override // kd.bos.mq.support.Consumer
    public int getConcurrency() {
        return this.concurrency;
    }

    @Override // kd.bos.mq.support.Consumer
    public String getQueueName() {
        return this.queue;
    }

    @Override // kd.bos.mq.support.Consumer
    public boolean isAutoAck() {
        return this.autoAck;
    }

    @Override // kd.bos.mq.support.Consumer
    public MessageConsumer getMessageConsumer() {
        return this.mc;
    }

    @Override // kd.bos.mq.support.Consumer
    public Date getStartAt() {
        return this.startAt;
    }

    @Override // kd.bos.mq.support.Consumer
    public void start() {
        try {
            this.isStartedFlag = true;
            this.startAt = new Date();
            QueueDef queueDefWithRealQueueName = QueueManager.getQueueDefWithRealQueueName(this.region, this.queue);
            this.topic = KafkaTopicManager.getTopic(KafkamqFactory.getKafkaInfo(ProducerFactory.getRegionServerKey(this.region)).getVhost(), this.region, this.queue);
            QueueManager.declareIfNeed(null, this.region, this.queue, queueDefWithRealQueueName.getMaxQueueLength());
            this.consumer = ConsumerFactory.getConsumer(this.region, this.queue);
            String appid = queueDefWithRealQueueName.getAppid();
            if (StringUtils.isEmpty(appid)) {
                this.appId = KafkaConstants.DEFAULT_APPID;
            } else {
                this.appId = appid;
            }
            if (ConsumerFactory.getAppIdPollThread(this.appId) == null) {
                ReentrantLock reentrantLock = new ReentrantLock();
                String str = "kafkaConsumer-poll-" + this.region + "-" + this.appId;
                PollThread pollThread = new PollThread(str, this.region, this.appId, this.consumer, reentrantLock);
                ConsumerFactory.putAppIdPollThread(this.appId, pollThread);
                pollThread.start();
                LOGGER.info(str + " start...");
                String str2 = "kafkaConsumer-commit-" + this.region + "-" + this.appId;
                CommitThread commitThread = new CommitThread(str2, this.appId, this.consumer, reentrantLock, pollThread);
                ConsumerFactory.putAppIdSubmitThread(this.appId, commitThread);
                commitThread.start();
                LOGGER.info(str2 + " start...");
            }
        } catch (Exception e) {
            ExceptionLogger.log("Can't init consumer for queue " + this.queue, e);
            throw new KDException(e, BosErrorCode.mqException, new Object[]{"Can't init consumer for queue " + this.queue});
        }
    }

    @Override // kd.bos.mq.support.Consumer
    public void $$stop() {
        try {
            this.isStartedFlag = false;
            LOGGER.info("print stack for appId={},region={},queue={},topic={} to debug:{}", new Object[]{this.appId, this.region, this.queue, this.topic, ExceptionUtils.getExceptionStackTraceMessage(new Exception())});
            KafkaTopicManager.removeTopic(this.appId, this.topic);
        } catch (Exception e) {
            ExceptionLogger.log("error when stop mqchannel" + this.queue, e);
        }
    }

    @Override // kd.bos.mq.support.Consumer
    public boolean isStarted() {
        return this.isStartedFlag;
    }
}
