/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull;

import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQConsumerFactory;
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.pull.RocketMQAckCallback;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils;

public class RocketMQMessageSource
extends AbstractMessageSource<Object>
implements DisposableBean,
Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(RocketMQMessageSource.class);
    private DefaultLitePullConsumer consumer;
    private final Map<String, Collection<MessageQueue>> messageQueuesForTopic = new ConcurrentHashMap<String, Collection<MessageQueue>>();
    private volatile boolean running;
    private final String topic;
    private final MessageSelector messageSelector;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;
    private volatile Iterator<MessageExt> messageExtIterator = null;

    public RocketMQMessageSource(String name, ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
        this.topic = name;
        this.messageSelector = RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties)extendedConsumerProperties.getExtension()).getSubscription());
        this.extendedConsumerProperties = extendedConsumerProperties;
    }

    public synchronized void start() {
        Instrumentation instrumentation = new Instrumentation(this.topic, this);
        try {
            if (this.isRunning()) {
                throw new IllegalStateException("pull consumer already running. " + ((Object)((Object)this)).toString());
            }
            this.consumer = RocketMQConsumerFactory.initPullConsumer(this.extendedConsumerProperties);
            this.consumer.subscribe(this.topic, this.messageSelector);
            this.consumer.setAutoCommit(false);
            this.consumer.registerTopicMessageQueueChangeListener(this.topic, this.messageQueuesForTopic::put);
            this.consumer.start();
            this.messageQueuesForTopic.put(this.topic, this.consumer.fetchMessageQueues(this.topic));
            instrumentation.markStartedSuccessfully();
        }
        catch (MQClientException e) {
            instrumentation.markStartFailed((Exception)((Object)e));
            log.error("DefaultMQPullConsumer startup error: " + e.getMessage(), (Throwable)e);
        }
        finally {
            InstrumentationManager.addHealthInstrumentation(instrumentation);
        }
        this.running = true;
    }

    private MessageQueue acquireCurrentMessageQueue(String topic, int queueId) {
        Collection<MessageQueue> messageQueueSet = this.messageQueuesForTopic.get(topic);
        if (CollectionUtils.isEmpty(messageQueueSet)) {
            return null;
        }
        for (MessageQueue messageQueue : messageQueueSet) {
            if (messageQueue.getQueueId() != queueId) continue;
            return messageQueue;
        }
        return null;
    }

    public synchronized void stop() {
        if (this.isRunning() && null != this.consumer) {
            this.consumer.unsubscribe(this.topic);
            this.consumer.shutdown();
            this.running = false;
        }
    }

    public synchronized boolean isRunning() {
        return this.running;
    }

    protected synchronized Object doReceive() {
        if (this.messageExtIterator == null) {
            List messageExtList = this.consumer.poll();
            if (CollectionUtils.isEmpty((Collection)messageExtList)) {
                return null;
            }
            this.messageExtIterator = messageExtList.iterator();
        }
        MessageExt messageExt = this.messageExtIterator.next();
        if (!this.messageExtIterator.hasNext()) {
            this.messageExtIterator = null;
        }
        if (null == messageExt) {
            return null;
        }
        MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), messageExt.getQueueId());
        if (messageQueue == null) {
            throw new IllegalArgumentException("The message queue is not in assigned list");
        }
        Message message = RocketMQMessageConverterSupport.convertMessage2Spring(messageExt);
        return MessageBuilder.fromMessage((Message)message).setHeader("acknowledgmentCallback", (Object)new RocketMQAckCallback(this.consumer, messageQueue, messageExt)).build();
    }

    public String getComponentType() {
        return "rocketmq:message-source";
    }
}

