package kd.bos.mq.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import kd.bos.kafka.KafkamqFactory;
import kd.bos.mq.config.ConfigKeys;
import kd.bos.mq.delay.DelayControlManager;
import kd.bos.mq.delay.MetaTime;
import kd.bos.mq.kafka.KafkaConstants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: KafkaDelayManager.java */
/* loaded from: input_file:kd/bos/mq/kafka/KafkaDelayConsumer.class */
public class KafkaDelayConsumer {
    private String delayTopicLevel;
    private List<MetaTime> metaTimeList;
    private org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> consumer;
    private String groupId;
    private CountDownLatch countDownLatch;
    private volatile boolean isRunning;
    private Map<String, Integer> topic2unitsecondsMap;

    public KafkaDelayConsumer(String str, List<MetaTime> list, String str2, CountDownLatch countDownLatch) {
        this.delayTopicLevel = str;
        this.metaTimeList = list;
        this.groupId = str2;
        this.countDownLatch = countDownLatch;
        this.topic2unitsecondsMap = new HashMap(list.size());
        for (MetaTime metaTime : list) {
            this.topic2unitsecondsMap.put(KafkaDelayManager.getDelayTopicName(metaTime.getName()), Integer.valueOf(metaTime.getMillis() / 1000));
        }
    }

    private List<String> getDelayTopicList() {
        ArrayList arrayList = new ArrayList(this.metaTimeList.size());
        this.metaTimeList.stream().forEach(metaTime -> {
            arrayList.add(KafkaDelayManager.getDelayTopicName(metaTime.getName()));
        });
        return arrayList;
    }

    private long getPollTimeoutMillis() {
        return this.delayTopicLevel.equals(KafkaDelayManager.DELAYTOPIC_LEVEL_LOW) ? Long.getLong("mq.kafka.low.delayconsumer.poll.timeout", 500L).longValue() : Long.getLong("mq.kafka.high.delayconsumer.poll.timeout", 1000L).longValue();
    }

    private long getUnexpiredRecordWaitMillis() {
        return this.delayTopicLevel.equals(KafkaDelayManager.DELAYTOPIC_LEVEL_LOW) ? Long.getLong("mq.kafka.low.delayconsumer.poll.unexpired.wait", 500L).longValue() : Long.getLong("mq.kafka.high.delayconsumer.poll.unexpired.wait", 4000L).longValue();
    }

    public void start() {
        String obj;
        byte[] bArr;
        long byteArrayToLong;
        byte[] bArr2;
        String str;
        int intValue;
        String str2;
        this.isRunning = true;
        Properties consumerConfig = KafkaConfig.getConsumerConfig(KafkamqFactory.getKafkaInfo(ConfigKeys.MQ_SERVER_KEY));
        consumerConfig.put("group.id", this.groupId);
        this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(consumerConfig);
        this.consumer.subscribe(getDelayTopicList());
        this.countDownLatch.countDown();
        while (this.isRunning) {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(getPollTimeoutMillis()));
            if (!poll.isEmpty()) {
                Iterator it = poll.iterator();
                while (true) {
                    if (it.hasNext()) {
                        ConsumerRecord<String, byte[]> consumerRecord = (ConsumerRecord) it.next();
                        byte[] bArr3 = (byte[]) consumerRecord.value();
                        try {
                            Map<String, Object> recordHeaderMap = getRecordHeaderMap(consumerRecord);
                            obj = recordHeaderMap.get(KafkaConstants.DelayConstants.TARGET_TOPIC).toString();
                            bArr = (byte[]) recordHeaderMap.get("startDeliverTime");
                            byteArrayToLong = ClassCastUtil.byteArrayToLong(bArr);
                            bArr2 = null;
                            if (recordHeaderMap.containsKey(KafkaConstants.RetryConstants.RETRY_TIMES)) {
                                bArr2 = (byte[]) recordHeaderMap.get(KafkaConstants.RetryConstants.RETRY_TIMES);
                            }
                            str = consumerRecord.topic();
                            intValue = this.topic2unitsecondsMap.get(str).intValue();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        if (((int) ((consumerRecord.timestamp() + (intValue * 1000)) - System.currentTimeMillis())) / 1000 > 0) {
                            this.consumer.seek(new TopicPartition(str, consumerRecord.partition()), consumerRecord.offset());
                            Thread.currentThread();
                            Thread.sleep(getUnexpiredRecordWaitMillis());
                            break;
                        }
                        int currentTimeMillis = ((int) (byteArrayToLong - System.currentTimeMillis())) / 1000;
                        if (currentTimeMillis > 0) {
                            MetaTime selectMaxMetaTime = DelayControlManager.selectMaxMetaTime(currentTimeMillis);
                            if (selectMaxMetaTime == MetaTime.delay_1s && intValue == 1) {
                                Thread.currentThread();
                                Thread.sleep(500L);
                                this.consumer.seek(new TopicPartition(str, consumerRecord.partition()), consumerRecord.offset());
                            } else {
                                str2 = KafkaDelayManager.getDelayTopicName(selectMaxMetaTime.getName());
                            }
                        } else {
                            str2 = obj;
                        }
                        dispatchMessage(str2, bArr3, obj, bArr, bArr2);
                        HashMap hashMap = new HashMap();
                        hashMap.put(new TopicPartition(str, consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                        this.consumer.commitSync(hashMap);
                    }
                }
            }
        }
    }

    private void dispatchMessage(String str, byte[] bArr, String str2, byte[] bArr2, byte[] bArr3) throws InterruptedException, ExecutionException {
        ProducerRecord producerRecord = new ProducerRecord(str, bArr);
        producerRecord.headers().add(KafkaConstants.DelayConstants.TARGET_TOPIC, str2.getBytes());
        producerRecord.headers().add("startDeliverTime", bArr2);
        if (bArr3 != null) {
            producerRecord.headers().add(KafkaConstants.RetryConstants.RETRY_TIMES, bArr3);
        }
        KafkaDelayManager.dispatchMessage(producerRecord).get();
    }

    private Map<String, Object> getRecordHeaderMap(ConsumerRecord<String, byte[]> consumerRecord) {
        Header[] array = consumerRecord.headers().toArray();
        HashMap hashMap = new HashMap(array.length);
        for (Header header : array) {
            String key = header.key();
            if (key.equals(KafkaConstants.DelayConstants.TARGET_TOPIC)) {
                hashMap.put(KafkaConstants.DelayConstants.TARGET_TOPIC, new String(header.value()));
            } else if (key.equals("startDeliverTime")) {
                hashMap.put("startDeliverTime", header.value());
            } else if (key.equals(KafkaConstants.RetryConstants.RETRY_TIMES)) {
                hashMap.put(KafkaConstants.RetryConstants.RETRY_TIMES, header.value());
            }
        }
        return hashMap;
    }

    public void stop() {
        this.isRunning = false;
        this.consumer.close();
    }
}
