package kd.bos.mq.kafka;

import java.time.Duration;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.delay.MetaTime;
import kd.bos.mq.jms.JMSSessionFactory;
import kd.bos.mq.kafka.KafkaConstants;
import kd.bos.mq.rabbit.ExceptionLogger;
import kd.bos.mq.support.ConsumerManager;
import kd.bos.mq.support.QueueManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;

/* compiled from: KafkaConsumer.java */
/* loaded from: input_file:kd/bos/mq/kafka/PollThread.class */
class PollThread extends Thread {
    private static final Log LOGGER = LogFactory.getLog(PollThread.class);
    private ReentrantLock syncLock;
    private String region;
    private String appId;
    private CopyOnWriteArraySet<String> topicSet;
    private int topicSize;
    private String groupId;
    private org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> consumer;
    private Map<String, Semaphore> TOPIC_SEMAPHORE_MAP = new HashMap();
    private ArrayBlockingQueue<ConsumerRecord<String, byte[]>> recordQueue = new ArrayBlockingQueue<>(1000);
    private ArrayBlockingQueue<Map<TopicPartition, OffsetAndMetadata>> offsetQueue = new ArrayBlockingQueue<>(10000);

    public PollThread(String str, String str2, String str3, org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> kafkaConsumer, ReentrantLock reentrantLock) {
        setName(str);
        this.region = str2;
        this.appId = str3;
        this.consumer = kafkaConsumer;
        this.groupId = kafkaConsumer.groupMetadata().groupId();
        this.topicSet = KafkaTopicManager.getTopicSet(str3);
        this.topicSize = this.topicSet.size();
        this.syncLock = reentrantLock;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        syncSubscribe(this.topicSet);
        pollAndHandleMessage();
        this.consumer.close();
    }

    private void pollAndHandleMessage() {
        int parseInt;
        while (this.topicSize > 0) {
            String str = "";
            try {
                parseInt = Integer.parseInt(System.getProperty(KafkaConstants.RetryConstants.MQ_KAFKA_CONSUMER_RETRY_TIMES, "16"));
                while (this.recordQueue.size() > 0) {
                    try {
                        handleMemoryQueueFirst(parseInt);
                    } catch (InterruptedException e) {
                        ExceptionLogger.log("memoryQueue take InterruptedException", e);
                    }
                }
            } catch (Exception e2) {
                ExceptionLogger.log(Thread.currentThread().getName() + " pollAndHandleMessage topic=" + str + " error", e2);
            }
            if (this.topicSize != KafkaTopicManager.getTopicSet(this.appId).size() && handleTopicsChanged()) {
                return;
            }
            ConsumerRecords<String, byte[]> syncPoll = syncPoll();
            if (!syncPoll.isEmpty()) {
                Iterator it = syncPoll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<String, byte[]> consumerRecord = (ConsumerRecord) it.next();
                    str = consumerRecord.topic();
                    String regionByTopic = KafkaTopicManager.getRegionByTopic(str);
                    String queueByTopic = KafkaTopicManager.getQueueByTopic(str);
                    if (QueueManager.getConsumers().containsKey(regionByTopic + queueByTopic)) {
                        Semaphore semaphore = this.TOPIC_SEMAPHORE_MAP.get(str);
                        if (semaphore == null) {
                            semaphore = new Semaphore(2);
                            this.TOPIC_SEMAPHORE_MAP.put(str, semaphore);
                        }
                        try {
                        } catch (InterruptedException e3) {
                            ExceptionLogger.log("semaphore InterruptedException", e3);
                        }
                        if (semaphore.tryAcquire(10L, TimeUnit.MILLISECONDS) || !this.recordQueue.offer(consumerRecord, 1000L, TimeUnit.MILLISECONDS)) {
                            Semaphore semaphore2 = semaphore;
                            KafkaConsumerWorkerPool.execute(() -> {
                                int i;
                                try {
                                    try {
                                        KafkaAcker kafkaAcker = new KafkaAcker();
                                        int headerRetryTimes = getHeaderRetryTimes(consumerRecord);
                                        handleDelivery(consumerRecord, kafkaAcker, headerRetryTimes);
                                        if (kafkaAcker.isDenied() && (i = headerRetryTimes + 1) < parseInt) {
                                            dispatchToDelayTopic(consumerRecord, i);
                                        }
                                    } catch (Exception e4) {
                                        ExceptionLogger.log("kafkaWorker handleDelivery topic=" + consumerRecord.topic() + " error", e4);
                                        commitOffset(consumerRecord);
                                        semaphore2.release();
                                    }
                                } finally {
                                    commitOffset(consumerRecord);
                                    semaphore2.release();
                                }
                            });
                        }
                    } else {
                        LOGGER.info("kafkaConsumer not find in QueueManager,appId={},topic={},region={},queue={}", new Object[]{this.appId, str, regionByTopic, queueByTopic});
                        try {
                            KafkaDispatchProducer.send(copyRecord(consumerRecord)).get();
                            LOGGER.info("send message back success,appId={},topic={},region={},queue={}", new Object[]{this.appId, consumerRecord.topic(), regionByTopic, queueByTopic});
                            KafkaTopicManager.removeTopic(this.appId, str);
                            LOGGER.info("KafkaTopicManager.removeTopic,appId={},topic={}", this.appId, str);
                            commitOffset(consumerRecord);
                        } catch (Exception e4) {
                            LOGGER.error("kafkaConsumer not find in QueueManager,hander error,appId={},topic={},region={},queue={}", new Object[]{this.appId, str, regionByTopic, queueByTopic});
                        }
                    }
                    ExceptionLogger.log(Thread.currentThread().getName() + " pollAndHandleMessage topic=" + str + " error", e2);
                }
            }
        }
    }

    private ProducerRecord<String, byte[]> copyRecord(ConsumerRecord<String, byte[]> consumerRecord) {
        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(consumerRecord.topic(), consumerRecord.value());
        Header[] array = consumerRecord.headers().toArray();
        if (array.length > 0) {
            for (Header header : array) {
                producerRecord.headers().add(header.key(), header.value());
            }
        }
        return producerRecord;
    }

    private void dispatchToDelayTopic(ConsumerRecord<String, byte[]> consumerRecord, int i) throws ExecutionException, InterruptedException {
        MetaTime genInstanceByLevel = MetaTime.genInstanceByLevel(i + 2);
        if (genInstanceByLevel == null) {
            return;
        }
        ProducerRecord producerRecord = new ProducerRecord(KafkaDelayManager.getDelayTopicName(genInstanceByLevel.getName()), consumerRecord.value());
        producerRecord.headers().add(KafkaConstants.DelayConstants.TARGET_TOPIC, consumerRecord.topic().getBytes());
        Calendar calendar = Calendar.getInstance();
        calendar.add(14, genInstanceByLevel.getMillis());
        producerRecord.headers().add("startDeliverTime", ClassCastUtil.longToBytes(calendar.getTime().getTime()));
        producerRecord.headers().add(KafkaConstants.RetryConstants.RETRY_TIMES, ClassCastUtil.int2bytes(i));
        KafkaDispatchProducer.send(producerRecord).get();
    }

    private void commitOffset(ConsumerRecord<String, byte[]> consumerRecord) {
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
        if (this.offsetQueue.offer(hashMap)) {
            return;
        }
        syncCommit(hashMap);
    }

    private boolean handleTopicsChanged() {
        this.topicSet = KafkaTopicManager.getTopicSet(this.appId);
        if (this.topicSet == null || this.topicSet.size() == 0) {
            return true;
        }
        this.topicSize = this.topicSet.size();
        Iterator<String> it = this.topicSet.iterator();
        while (it.hasNext()) {
            String next = it.next();
            if (!this.TOPIC_SEMAPHORE_MAP.containsKey(next)) {
                this.TOPIC_SEMAPHORE_MAP.put(next, new Semaphore(QueueManager.getQueueDefWithRealQueueName(null, KafkaTopicManager.getQueueByTopic(next)).getConsumers().get(0).getConcurrency()));
            }
        }
        syncSubscribe(this.topicSet);
        return false;
    }

    private void handleMemoryQueueFirst(int i) throws InterruptedException {
        ConsumerRecord<String, byte[]> take = this.recordQueue.take();
        KafkaConsumerWorkerPool.execute(() -> {
            int i2;
            KafkaAcker kafkaAcker = new KafkaAcker();
            int headerRetryTimes = getHeaderRetryTimes(take);
            try {
                try {
                    handleDelivery(take, kafkaAcker, headerRetryTimes);
                    if (kafkaAcker.isDenied() && (i2 = headerRetryTimes + 1) < i) {
                        dispatchToDelayTopic(take, i2);
                    }
                    commitOffset(take);
                } catch (Exception e) {
                    ExceptionLogger.log("kafkaWorker handleMemoryQueueFirst topic=" + take.topic() + " error", e);
                    commitOffset(take);
                }
            } catch (Throwable th) {
                commitOffset(take);
                throw th;
            }
        });
    }

    private void handleDelivery(ConsumerRecord<String, byte[]> consumerRecord, KafkaAcker kafkaAcker, int i) {
        String str = consumerRecord.topic();
        String str2 = str + JMSSessionFactory.SPLIT + consumerRecord.partition() + JMSSessionFactory.SPLIT + consumerRecord.offset();
        KafkaConsumer kafkaConsumer = (KafkaConsumer) QueueManager.getConsumers().get(KafkaTopicManager.getRegionByTopic(str) + KafkaTopicManager.getQueueByTopic(str));
        ConsumerManager.innerHandleDelivery(kafkaAcker, kafkaConsumer.getMessageConsumer(), kafkaConsumer.getRegion(), kafkaConsumer.getQueueName(), str2, i, (byte[]) consumerRecord.value(), str);
    }

    private int getHeaderRetryTimes(ConsumerRecord<String, byte[]> consumerRecord) {
        int i = 0;
        Map<String, Object> recordHeaderMap = getRecordHeaderMap(consumerRecord);
        if (recordHeaderMap.containsKey(KafkaConstants.RetryConstants.RETRY_TIMES)) {
            i = ClassCastUtil.bytes2Int((byte[]) recordHeaderMap.get(KafkaConstants.RetryConstants.RETRY_TIMES));
        }
        return i;
    }

    private Map<String, Object> getRecordHeaderMap(ConsumerRecord<String, byte[]> consumerRecord) {
        Header[] array = consumerRecord.headers().toArray();
        HashMap hashMap = new HashMap(array.length);
        for (Header header : array) {
            String key = header.key();
            if (key.equals(KafkaConstants.DelayConstants.TARGET_TOPIC)) {
                hashMap.put(KafkaConstants.DelayConstants.TARGET_TOPIC, new String(header.value()));
            } else if (key.equals("startDeliverTime")) {
                hashMap.put("startDeliverTime", header.value());
            } else if (key.equals(KafkaConstants.RetryConstants.RETRY_TIMES)) {
                hashMap.put(KafkaConstants.RetryConstants.RETRY_TIMES, header.value());
            }
        }
        return hashMap;
    }

    private ConsumerRecords<String, byte[]> syncPoll() {
        try {
            this.syncLock.lock();
            return this.consumer.poll(Duration.ofMillis(100L));
        } finally {
            this.syncLock.unlock();
        }
    }

    private void syncSubscribe(Collection<String> collection) {
        try {
            this.syncLock.lock();
            this.consumer.subscribe(new HashSet(collection));
        } finally {
            this.syncLock.unlock();
        }
    }

    private void syncCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            this.syncLock.lock();
            this.consumer.commitSync(map);
        } finally {
            this.syncLock.unlock();
        }
    }

    public Set<String> getTopicSet() {
        return this.topicSet;
    }

    public Map<TopicPartition, OffsetAndMetadata> pollOffset() {
        try {
            return this.offsetQueue.poll(10L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            ExceptionLogger.log("memoryQueue poll InterruptedException", e);
            return null;
        }
    }
}
