From 8451f39dd37dd70ae5257d1bff93871f190ad709 Mon Sep 17 00:00:00 2001 From: ccwss <1782935682@qq.com> Date: Fri, 28 Nov 2025 09:31:03 +0800 Subject: [PATCH 1/2] consume_from_where not working during initial launch --- .../config/v2/ConsumerOffsetManagerV2.java | 17 ++++++++++++ .../broker/offset/ConsumerOffsetManager.java | 14 ++++++++++ .../offset/LmqConsumerOffsetManager.java | 12 +++++++++ .../processor/ConsumerManageProcessor.java | 26 ++++++++++--------- .../ConsumerManageProcessorTest.java | 1 + 5 files changed, 58 insertions(+), 12 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index e14ac0bb628..ad48eadd32f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -404,6 +404,23 @@ public long queryOffset(String group, String topic, int queueId) { } } + @Override + public boolean hasOffsetRecord(final String group, final String topic) { + if (!MixAll.isLmq(topic)) { + return super.hasOffsetRecord(group, topic); + } + + ByteBuf keyBuf = keyOfConsumerOffset(group, topic, 0); + try { + byte[] slice = configStorage.get(keyBuf.nioBuffer()); + return slice != null; + } catch (RocksDBException e) { + throw new RuntimeException(e); + } finally { + keyBuf.release(); + } + } + @Override public void commitPullOffset(String clientHost, String group, String topic, int queueId, long offset) { if (!MixAll.isLmq(topic)) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 3eee9fc559a..67dff451b82 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -258,6 +258,20 @@ public long queryOffset(final String group, final String topic, final int queueI return -1L; } + public boolean hasOffsetRecord(final String group, final String topic) { + String key = topic + TOPIC_GROUP_SEPARATOR + group; + + if (this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) { + Map reset = resetOffsetTable.get(key); + if (reset != null) { + return true; + } + } + + ConcurrentMap map = this.offsetTable.get(key); + return map != null; + } + /** * Query pull offset in pullOffsetTable * @param group Consumer group diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java index a565ad07c3a..391ac8fbd47 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java @@ -68,6 +68,18 @@ public Map queryOffset(final String group, final String topic) { return map; } + @Override + public boolean hasOffsetRecord(final String group, final String topic) { + if (!MixAll.isLmq(group)) { + return super.hasOffsetRecord(group, topic); + } + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + + Long offset = lmqOffsetTable.get(key); + return offset != null; + } + @Override public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java index dfa755d7c44..4ff008ba64b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java @@ -327,18 +327,20 @@ private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingC response.setCode(ResponseCode.SUCCESS); response.setRemark(null); } else { - long minOffset = - this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), - requestHeader.getQueueId()); - if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) { - response.setCode(ResponseCode.QUERY_NOT_FOUND); - response.setRemark("Not found, do not set to zero, maybe this group boot first"); - } else if (minOffset <= 0 - && this.brokerController.getMessageStore().checkInMemByConsumeOffset( - requestHeader.getTopic(), requestHeader.getQueueId(), 0, 1)) { - responseHeader.setOffset(0L); - response.setCode(ResponseCode.SUCCESS); - response.setRemark(null); + if (this.brokerController.getConsumerOffsetManager().hasOffsetRecord(requestHeader.getConsumerGroup(), requestHeader.getTopic())) { + long minOffset = + this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), + requestHeader.getQueueId()); + if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) { + response.setCode(ResponseCode.QUERY_NOT_FOUND); + response.setRemark("Not found, do not set to zero, maybe this group boot first"); + } else if (minOffset <= 0 + && this.brokerController.getMessageStore().checkInMemByConsumeOffset( + requestHeader.getTopic(), requestHeader.getQueueId(), 0, 1)) { + responseHeader.setOffset(0L); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + } } else { response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java index 6b3c2578af3..7fcbb051ffb 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java @@ -175,6 +175,7 @@ public void testQueryConsumerOffset() throws RemotingCommandException, Execution when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); when(consumerOffsetManager.queryOffset(anyString(),anyString(),anyInt())).thenReturn(0L); + when(consumerOffsetManager.hasOffsetRecord(anyString(), anyString())).thenReturn(true); response = consumerManageProcessor.processRequest(handlerContext, request); assertThat(response).isNotNull(); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); From 59a9e6e18e5fdf6badf66d55108117acc910e482 Mon Sep 17 00:00:00 2001 From: ccwss <1782935682@qq.com> Date: Tue, 2 Dec 2025 10:01:59 +0800 Subject: [PATCH 2/2] fix test --- .../test/client/consumer/filter/SqlFilterIT.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java index 88afbeef2a0..28e306f1e13 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java @@ -74,9 +74,7 @@ public void testFilterConsumer() throws Exception { producer.send("TagC", msgSize); Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size()); consumer.getListener().waitForMessageConsume(msgSize * 2, CONSUME_TIME); - assertThat(producer.getAllMsgBody()) - .containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), - consumer.getListener().getAllMsgBody())); + assertThat(producer.getAllMsgBody()).containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody())); assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2); } @@ -90,7 +88,8 @@ public void testFilterPullConsumer() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group); consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.start(); - Thread.sleep(3000); + consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic); + consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); producer.send("TagA", msgSize); producer.send("TagB", msgSize); producer.send("TagC", msgSize); @@ -102,8 +101,7 @@ public void testFilterPullConsumer() throws Exception { SINGLE_MQ: while (true) { try { - PullResult pullResult = - consumer.pull(mq, selector, getMessageQueueOffset(mq), 32); + PullResult pullResult = consumer.pull(mq, selector, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: