package com.huawei.it.eip.ump.client.consumer;

import com.alibaba.fastjson.JSON;
import com.huawei.it.eip.ump.client.AbstractClient;
import com.huawei.it.eip.ump.client.admin.UmpAdmin;
import com.huawei.it.eip.ump.client.config.ClientConfigMap;
import com.huawei.it.eip.ump.client.http.HttpFsClient;
import com.huawei.it.eip.ump.client.listener.MessageListener;
import com.huawei.it.eip.ump.common.constant.UmpConstants;
import com.huawei.it.eip.ump.common.exception.UmpException;
import com.huawei.it.eip.ump.common.log.LogOperateType;
import com.huawei.it.eip.ump.common.log.MessageLogBo;
import com.huawei.it.eip.ump.common.message.Message;
import com.huawei.it.eip.ump.common.util.CommonUtils;
import com.huawei.it.eip.ump.common.util.LogUtils;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.ext.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.ext.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.ext.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.ext.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.ext.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

/* loaded from: input_file:com/huawei/it/eip/ump/client/consumer/Consumer.class */
public class Consumer extends AbstractClient implements UmpAdmin {
    private DefaultMQPushConsumer defaultMQPushConsumer;
    private MessageListener messageListener;
    private DelayTime[] delayTimeArray;
    private String subGroup;
    private MessageModel messageModel = MessageModel.CLUSTERING;
    private String consumeTimestamp = CommonUtils.timeMillisToHumanString3(System.currentTimeMillis() - 1800000);
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
    private int consumeThreadMin = 8;
    private int consumeThreadMax = 32;
    private boolean groupWithTags = false;
    private int pullThresholdForQueue = 1000;
    private long pullInterval = 0;
    private int consumeMessageBatchMaxSize = 1;
    private int pullBatchSize = 32;
    protected ConsumerLargeBodyHelper largeBodyHelper = null;
    private long consumeTimeoutMinutes = 0;

    public Consumer() {
        this.clientLoginCode = 18;
    }

    protected Message processMessageBeforeConsume(Message message) throws UmpException {
        message.setBody(this.largeBodyHelper.fetchBody(message));
        message.clearProperty("UmpLargeBodySize");
        message.clearProperty("UmpLargeBodyCompressed");
        return message;
    }

    public ConsumerLargeBodyHelper createLargeBodyHelper() {
        HttpFsClient httpFsClient = new HttpFsClient();
        httpFsClient.setAppid(getAppId());
        httpFsClient.setToken(getAppSecret());
        return new ConsumerLargeBodyHelper(httpFsClient);
    }

    private DefaultMQPushConsumer createConsumer() throws MQClientException {
        String formatAppId = CommonUtils.formatAppId(getAppId());
        String topic = getTopic();
        String tags = getTags();
        String str = formatAppId + "_" + topic;
        if (isGroupWithTags()) {
            String formatTags = CommonUtils.formatTags(tags);
            if (CommonUtils.isNotEmpty(formatTags)) {
                str = str + "_" + formatTags;
            }
        }
        if (CommonUtils.isNotEmpty(this.subGroup)) {
            str = str + "_" + CommonUtils.formatAppId(this.subGroup);
        }
        logger.info("The consumerGroup is {}", str);
        this.defaultMQPushConsumer = new DefaultMQPushConsumer(str);
        String str2 = formatAppId + "_" + CommonUtils.generateUuid();
        if (MessageModel.BROADCASTING == (getMessageModel() == null ? MessageModel.CLUSTERING : getMessageModel())) {
            str2 = str + "_" + (CommonUtils.isEmpty(getInstanceName()) ? getClientIp() : getInstanceName());
        }
        String str3 = str2 + "_" + CommonUtils.formatAppId("v2.0.1");
        this.defaultMQPushConsumer.setNamesrvAddr(getConnectorUrl());
        this.defaultMQPushConsumer.setInstanceName(str3);
        this.defaultMQPushConsumer.subscribe(topic, getTags());
        this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // from class: com.huawei.it.eip.ump.client.consumer.Consumer.1
            @Override // org.apache.rocketmq.client.ext.consumer.listener.MessageListenerConcurrently
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt messageExt = list.get(0);
                ConsumeStatus consumeStatus = ConsumeStatus.RECONSUME_LATER;
                ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                LogOperateType logOperateType = LogOperateType.RECEIVE_SUCCESS;
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    try {
                        Message umpMessage = Consumer.this.toUmpMessage(messageExt);
                        Consumer.this.processMessageBeforeConsume(umpMessage);
                        consumeStatus = Consumer.this.messageListener.consume(umpMessage);
                        if (ConsumeStatus.RECONSUME_LATER == consumeStatus) {
                            logOperateType = LogOperateType.RECEIVE_ERROR;
                            consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                            if (Consumer.this.delayTimeArray != null && Consumer.this.delayTimeArray.length > 0) {
                                consumeConcurrentlyContext.setDelayLevelWhenNextConsume(Consumer.this.convertDelayTimeLevel(messageExt.getDelayTimeLevel()));
                            }
                        }
                        Consumer.this.sendLogMessage(messageExt, logOperateType, currentTimeMillis);
                    } catch (Exception e) {
                        consumeStatus = ConsumeStatus.RECONSUME_LATER;
                        Consumer.logger.error(e.getMessage(), e);
                        if (ConsumeStatus.RECONSUME_LATER == consumeStatus) {
                            logOperateType = LogOperateType.RECEIVE_ERROR;
                            consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                            if (Consumer.this.delayTimeArray != null && Consumer.this.delayTimeArray.length > 0) {
                                consumeConcurrentlyContext.setDelayLevelWhenNextConsume(Consumer.this.convertDelayTimeLevel(messageExt.getDelayTimeLevel()));
                            }
                        }
                        Consumer.this.sendLogMessage(messageExt, logOperateType, currentTimeMillis);
                    }
                    return consumeConcurrentlyStatus;
                } catch (Throwable th) {
                    if (ConsumeStatus.RECONSUME_LATER == consumeStatus) {
                        logOperateType = LogOperateType.RECEIVE_ERROR;
                        ConsumeConcurrentlyStatus consumeConcurrentlyStatus2 = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        if (Consumer.this.delayTimeArray != null && Consumer.this.delayTimeArray.length > 0) {
                            consumeConcurrentlyContext.setDelayLevelWhenNextConsume(Consumer.this.convertDelayTimeLevel(messageExt.getDelayTimeLevel()));
                        }
                    }
                    Consumer.this.sendLogMessage(messageExt, logOperateType, currentTimeMillis);
                    throw th;
                }
            }
        });
        this.defaultMQPushConsumer.setMessageModel(convertMessageModel(getMessageModel()));
        this.defaultMQPushConsumer.setConsumeFromWhere(convertConsumeFromWhere(getConsumeFromWhere()));
        if (!CommonUtils.isEmpty(getConsumeTimestamp())) {
            this.defaultMQPushConsumer.setConsumeTimestamp(getConsumeTimestamp());
        }
        this.defaultMQPushConsumer.setConsumeThreadMin(getConsumeThreadMin());
        this.defaultMQPushConsumer.setConsumeThreadMax(getConsumeThreadMax());
        this.defaultMQPushConsumer.setPullThresholdForQueue(getPullThresholdForQueue());
        this.defaultMQPushConsumer.setPullInterval(getPullInterval());
        this.defaultMQPushConsumer.setPullBatchSize(getPullBatchSize());
        this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(this.consumeMessageBatchMaxSize);
        this.defaultMQPushConsumer.setConsumeTimeout(this.consumeTimeoutMinutes);
        this.defaultMQPushConsumer.setConsumeTimeoutHook(new ConsumeTimeoutHook() { // from class: com.huawei.it.eip.ump.client.consumer.Consumer.2
            @Override // com.huawei.it.eip.ump.client.consumer.ConsumeTimeoutHook
            public void doHook(MessageExt messageExt, long j) {
                Consumer.this.sendLogMessage(messageExt, LogOperateType.CONSUME_TIMEOUT, j);
            }
        });
        ClientConfigMap.put(this.defaultMQPushConsumer.buildMQClientId(), this);
        return this.defaultMQPushConsumer;
    }

    public void subscribe(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendLogMessage(MessageExt messageExt, LogOperateType logOperateType, long j) {
        try {
            Map<String, String> mapCoppy = CommonUtils.mapCoppy(messageExt.getProperties());
            mapCoppy.put("app_id", this.fixClientConfig.getAppId());
            mapCoppy.put("client_ip_address", this.fixClientConfig.getClientIp());
            mapCoppy.put("consumer_group", this.defaultMQPushConsumer.getConsumerGroup());
            mapCoppy.put("consumer_instance", this.defaultMQPushConsumer.getInstanceName());
            mapCoppy.put("message_model", String.valueOf(getMessageModel()));
            int length = messageExt.getBody() == null ? 0 : messageExt.getBody().length;
            int largeBodySize = getLargeBodySize(mapCoppy);
            if (largeBodySize > 0) {
                length = largeBodySize;
            }
            MessageLogBo logBo = LogUtils.toLogBo(mapCoppy, ((MessageClientExt) messageExt).getOffsetMsgId(), messageExt.getTopic(), logOperateType, getConnectorIp(), length);
            logBo.setCostMillis(System.currentTimeMillis() - j);
            logBo.setReConsumeTimes(messageExt.getReconsumeTimes());
            RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(919, (CommandCustomHeader) null);
            createRequestCommand.setBody(JSON.toJSONString(logBo).getBytes());
            this.remotingClient.invokeOneway((String) null, createRequestCommand, UmpConstants.TIMEOUT_MILLIS);
        } catch (Exception e) {
            logger.error("Failed to send log message.", e);
        }
    }

    protected int getLargeBodySize(Map<String, String> map) {
        String str = map.get("UmpLargeBodySize");
        if (!CommonUtils.isNotEmpty(str)) {
            return -1;
        }
        try {
            return Integer.parseInt(str);
        } catch (Throwable th) {
            return -1;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.huawei.it.eip.ump.client.AbstractClient
    public void startClient() throws MQClientException {
        super.startClient();
        this.largeBodyHelper = createLargeBodyHelper();
        createConsumer().start();
    }

    @Override // com.huawei.it.eip.ump.client.AbstractClient
    protected void shutdownClient() {
        if (this.defaultMQPushConsumer != null) {
            if (this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl() != null && this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory() != null && this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getScheduledExecutorService() != null) {
                this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getScheduledExecutorService().shutdown();
            }
            this.defaultMQPushConsumer.shutdown();
            this.defaultMQPushConsumer = null;
        }
    }

    protected void shutdownClientGracefully(long j, TimeUnit timeUnit) throws InterruptedException {
        if (this.defaultMQPushConsumer != null) {
            if (this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl() != null && this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory() != null && this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getScheduledExecutorService() != null) {
                this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getScheduledExecutorService().shutdown();
            }
            this.defaultMQPushConsumer.shutdownGracefully(j, timeUnit);
            this.defaultMQPushConsumer = null;
        }
    }

    public void shutdownGracefully(long j, TimeUnit timeUnit) throws UmpException {
        try {
            if (this.closed.compareAndSet(false, true)) {
                shutdownClientGracefully(j, timeUnit);
                this.namesrvService.shutdown();
            }
        } catch (Exception e) {
            throw new UmpException("shutdownGracefully " + this.clientName + " encounter exception.", e);
        }
    }

    public ConsumeFromWhere getConsumeFromWhere() {
        return this.consumeFromWhere;
    }

    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
        this.consumeFromWhere = consumeFromWhere;
    }

    public String getConsumeTimestamp() {
        return this.consumeTimestamp;
    }

    public void setConsumeTimestamp(String str) {
        this.consumeTimestamp = str;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public void setMessageModel(MessageModel messageModel) {
        this.messageModel = messageModel;
    }

    public int getConsumeThreadMin() {
        return this.consumeThreadMin;
    }

    public void setConsumeThreadMin(int i) {
        this.consumeThreadMin = i;
    }

    public int getConsumeThreadMax() {
        return this.consumeThreadMax;
    }

    public void setConsumeThreadMax(int i) {
        this.consumeThreadMax = i;
    }

    public boolean isGroupWithTags() {
        return this.groupWithTags;
    }

    public void setGroupWithTags(boolean z) {
        this.groupWithTags = z;
    }

    public int getPullThresholdForQueue() {
        return this.pullThresholdForQueue;
    }

    public void setPullThresholdForQueue(int i) {
        this.pullThresholdForQueue = i;
    }

    public long getPullInterval() {
        return this.pullInterval;
    }

    public void setPullInterval(long j) {
        this.pullInterval = j;
    }

    public int getPullBatchSize() {
        return this.pullBatchSize;
    }

    public void setPullBatchSize(int i) {
        this.pullBatchSize = i;
    }

    public String getSubGroup() {
        return this.subGroup;
    }

    public void setSubGroup(String str) {
        this.subGroup = str;
    }

    private org.apache.rocketmq.common.protocol.heartbeat.MessageModel convertMessageModel(MessageModel messageModel) {
        return MessageModel.BROADCASTING == messageModel ? org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING : org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING;
    }

    private org.apache.rocketmq.common.consumer.ConsumeFromWhere convertConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
        return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET == consumeFromWhere ? org.apache.rocketmq.common.consumer.ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET : ConsumeFromWhere.CONSUME_FROM_TIMESTAMP == consumeFromWhere ? org.apache.rocketmq.common.consumer.ConsumeFromWhere.CONSUME_FROM_TIMESTAMP : org.apache.rocketmq.common.consumer.ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.huawei.it.eip.ump.client.AbstractClient
    public void checkConfig() throws UmpException {
        super.checkConfig();
        if (this.messageListener == null) {
            throw new UmpException("messageListener is required.");
        }
    }

    public DelayTime[] getDelayTimeArray() {
        return this.delayTimeArray;
    }

    public void setDelayTimeArray(DelayTime[] delayTimeArr) {
        if (delayTimeArr != null && delayTimeArr.length > 0) {
            EnumSet noneOf = EnumSet.noneOf(DelayTime.class);
            noneOf.addAll(Arrays.asList(delayTimeArr));
            delayTimeArr = (DelayTime[]) noneOf.toArray(new DelayTime[0]);
            Arrays.sort(delayTimeArr);
        }
        this.delayTimeArray = delayTimeArr;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int convertDelayTimeLevel(int i) {
        if (this.delayTimeArray == null || this.delayTimeArray.length == 0) {
            return 0;
        }
        if (i >= this.delayTimeArray[this.delayTimeArray.length - 1].level()) {
            return -1;
        }
        for (DelayTime delayTime : this.delayTimeArray) {
            if (i < delayTime.level()) {
                return delayTime.level();
            }
        }
        return i;
    }

    public long getConsumeTimeoutMinutes() {
        return this.consumeTimeoutMinutes;
    }

    public void setConsumeTimeoutMinutes(long j) {
        this.consumeTimeoutMinutes = j;
    }

    @Override // com.huawei.it.eip.ump.client.config.ClientConfig
    public String toString() {
        return "Consumer [messageModel=" + this.messageModel + ", consumeFromWhere=" + this.consumeFromWhere + ", consumeThreadMin=" + this.consumeThreadMin + ", consumeThreadMax=" + this.consumeThreadMax + ", groupWithTags=" + this.groupWithTags + ", consumeTimestamp=" + this.consumeTimestamp + ", subGroup=" + this.subGroup + ", consumeTimeoutMinutes=" + this.consumeTimeoutMinutes + ", toString()=" + super.toString() + "]";
    }
}
