diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index f85dcc7b459..f5b202b16dd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -63,6 +63,8 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.NamespaceUtil; @@ -73,8 +75,6 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; public class DefaultLitePullConsumerImpl implements MQConsumerInner { @@ -329,8 +329,8 @@ public synchronized void start() throws MQClientException { private void initScheduledThreadPoolExecutor() { this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( - this.defaultLitePullConsumer.getPullThreadNums(), - new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()) + this.defaultLitePullConsumer.getPullThreadNums(), + new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup()) ); } @@ -912,8 +912,8 @@ public void run() { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS); if ((consumeRequestFlowControlTimes++ % 1000) == 0) { log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", - (int)Math.ceil((double)defaultLitePullConsumer.getPullThresholdForAll() / defaultLitePullConsumer.getPullBatchSize()), - consumeRequestCache.size(), consumeRequestFlowControlTimes); + (int) Math.ceil((double) defaultLitePullConsumer.getPullThresholdForAll() / defaultLitePullConsumer.getPullBatchSize()), + consumeRequestCache.size(), consumeRequestFlowControlTimes); } return; } @@ -1122,7 +1122,15 @@ public Set subscriptions() { Set subSet = new HashSet<>(); subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values()); - + if (!topicToSubExpression.isEmpty()) { + for (Map.Entry entry : topicToSubExpression.entrySet()) { + try { + subSet.add(FilterAPI.buildSubscriptionData(entry.getKey(), entry.getValue())); + } catch (Exception e) { + log.warn("BuildSubscriptionData error, topic=[{}], subString=[{}] ", entry.getKey(), entry.getValue(), e); + } + } + } return subSet; }