From 6347f75de8803d4ab5209f89b965c1834a7d7942 Mon Sep 17 00:00:00 2001 From: Crazylychee Date: Wed, 20 Aug 2025 21:26:21 +0800 Subject: [PATCH 1/3] [ISSUE #9623] Fix Failing Tests In MessageStoreWithFilterTest --- .../filter/MessageStoreWithFilterTest.java | 359 +++++++++--------- 1 file changed, 190 insertions(+), 169 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 84bca916998..16a5991d172 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -17,17 +17,6 @@ package org.apache.rocketmq.broker.filter; -import java.io.File; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; @@ -48,11 +37,21 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.awaitility.core.ThrowingRunnable; -import org.junit.After; import org.junit.Assume; -import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -63,47 +62,26 @@ public class MessageStoreWithFilterTest { private static final String TOPIC = "topic"; private static final int QUEUE_ID = 0; - private static final String STORE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "unit_test_store"; + private static final String BASE_STORE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "unit_test_store"; private static final int COMMIT_LOG_FILE_SIZE = 1024 * 1024 * 256; private static final int CQ_FILE_SIZE = 300000 * 20; private static final int CQ_EXT_FILE_SIZE = 300000 * 128; private static SocketAddress bornHost; - private static SocketAddress storeHost; - private DefaultMessageStore master; - - private ConsumerFilterManager filterManager; - - private int topicCount = 3; - - private int msgPerTopic = 30; + private final int topicCount = 3; + private final int msgPerTopic = 30; static { try { storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); - } catch (UnknownHostException e) { + } catch (UnknownHostException ignored) { } try { bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); - } catch (UnknownHostException e) { - } - } - - @Before - public void init() throws Exception { - filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); - master = gen(filterManager); - } - - @After - public void destroy() { - if (master != null) { - master.shutdown(); - master.destroy(); + } catch (UnknownHostException ignored) { } - UtilAll.deleteFile(new File(STORE_PATH)); } public MessageExtBrokerInner buildMessage() { @@ -126,7 +104,7 @@ public MessageExtBrokerInner buildMessage() { return msg; } - public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, + public MessageStoreConfig buildStoreConfig(String storePath, int commitLogFileSize, int cqFileSize, boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMappedFileSizeCommitLog(commitLogFileSize); @@ -135,15 +113,18 @@ public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize messageStoreConfig.setMessageIndexEnable(false); messageStoreConfig.setEnableConsumeQueueExt(enableCqExt); - messageStoreConfig.setStorePathRootDir(STORE_PATH); - messageStoreConfig.setStorePathCommitLog(STORE_PATH + File.separator + "commitlog"); + messageStoreConfig.setStorePathRootDir(storePath); + messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog"); return messageStoreConfig; } - protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Exception { + protected DefaultMessageStore gen(String testName, ConsumerFilterManager filterManager) throws Exception { + String storePath = BASE_STORE_PATH + "_" + testName; + cleanStoreDirectory(storePath); + MessageStoreConfig messageStoreConfig = buildStoreConfig( - COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE + storePath, COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE ); BrokerConfig brokerConfig = new BrokerConfig(); @@ -152,15 +133,15 @@ protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Ex brokerConfig.setExpectConsumerNumUseFilter(64); DefaultMessageStore master = new DefaultMessageStore( - messageStoreConfig, - new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()), - new MessageArrivingListener() { - @Override - public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map properties) { + messageStoreConfig, + new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()), + new MessageArrivingListener() { + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map properties) { + } } - } - , brokerConfig, new ConcurrentHashMap<>()); + , brokerConfig, new ConcurrentHashMap<>()); master.getDispatcherList().addFirst(new CommitLogDispatcher() { @Override @@ -184,6 +165,27 @@ public void dispatch(DispatchRequest request) { return master; } + private void cleanStoreDirectory(String storePath) { + File storeDir = new File(storePath); + if (storeDir.exists()) { + UtilAll.deleteFile(storeDir); + } + } + + public void destroy(DefaultMessageStore master) { + if (master != null) { + String storePath = master.getMessageStoreConfig().getStorePathRootDir(); + try { + master.shutdown(); + master.destroy(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + cleanStoreDirectory(storePath); + } + } + } + protected List putMsg(DefaultMessageStore master, int topicCount, int msgCountPerTopic) throws Exception { List msgs = new ArrayList<>(); @@ -232,157 +234,176 @@ protected List filtered(List msgs, @Test public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception { - List msgs = putMsg(master, topicCount, msgPerTopic); + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); + DefaultMessageStore master = gen("testFilterBitMapAndConsumerChanged", filterManager); - Thread.sleep(200); + try { + List msgs = putMsg(master, topicCount, msgPerTopic); - // reset consumer; - String topic = "topic" + 0; - String resetGroup = "CID_" + 2; - String normalGroup = "CID_" + 3; + Thread.sleep(2000); - { - // reset CID_2@topic0 to get all messages. - SubscriptionData resetSubData = new SubscriptionData(); - resetSubData.setExpressionType(ExpressionType.SQL92); - resetSubData.setTopic(topic); - resetSubData.setClassFilterMode(false); - resetSubData.setSubString("a is not null OR a is null"); + // reset consumer; + String topic = "topic" + 0; + String resetGroup = "CID_" + 2; + String normalGroup = "CID_" + 3; - ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic, - resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(), - System.currentTimeMillis()); + { + // reset CID_2@topic0 to get all messages. + SubscriptionData resetSubData = new SubscriptionData(); + resetSubData.setExpressionType(ExpressionType.SQL92); + resetSubData.setTopic(topic); + resetSubData.setClassFilterMode(false); + resetSubData.setSubString("a is not null OR a is null"); - GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, QUEUE_ID, 0, 1000, - new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager)); + ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic, + resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(), + System.currentTimeMillis()); - try { - assertThat(resetGetResult).isNotNull(); + GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, QUEUE_ID, 0, 1000, + new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager)); + try { + assertThat(resetGetResult).isNotNull(); - List filteredMsgs = filtered(msgs, resetFilterData); + List filteredMsgs = filtered(msgs, resetFilterData); - assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); - } finally { - resetGetResult.release(); + assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + } finally { + assert resetGetResult != null; + resetGetResult.release(); + } } - } - - { - ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup); - assertThat(normalFilterData).isNotNull(); - assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis()); - SubscriptionData normalSubData = new SubscriptionData(); - normalSubData.setExpressionType(normalFilterData.getExpressionType()); - normalSubData.setTopic(topic); - normalSubData.setClassFilterMode(false); - normalSubData.setSubString(normalFilterData.getExpression()); + { + ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup); + assertThat(normalFilterData).isNotNull(); + assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis()); - List filteredMsgs = filtered(msgs, normalFilterData); + SubscriptionData normalSubData = new SubscriptionData(); + normalSubData.setExpressionType(normalFilterData.getExpressionType()); + normalSubData.setTopic(topic); + normalSubData.setClassFilterMode(false); + normalSubData.setSubString(normalFilterData.getExpression()); - GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, QUEUE_ID, 0, 1000, - new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager)); + List filteredMsgs = filtered(msgs, normalFilterData); - try { - assertThat(normalGetResult).isNotNull(); - assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); - } finally { - normalGetResult.release(); + GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, QUEUE_ID, 0, 1000, + new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager)); + try { + assertThat(normalGetResult).isNotNull(); + assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + } finally { + assert normalGetResult != null; + normalGetResult.release(); + } } + } finally { + destroy(master); } } @Test public void testGetMessage_withFilterBitMap() throws Exception { - List msgs = putMsg(master, topicCount, msgPerTopic); - - Thread.sleep(100); + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); + DefaultMessageStore master = gen("testFilterBitMap", filterManager); - for (int i = 0; i < topicCount; i++) { - String realTopic = TOPIC + i; - - for (int j = 0; j < msgPerTopic; j++) { - String group = "CID_" + j; - - ConsumerFilterData filterData = filterManager.get(realTopic, group); - assertThat(filterData).isNotNull(); - - List filteredMsgs = filtered(msgs, filterData); - - SubscriptionData subscriptionData = new SubscriptionData(); - subscriptionData.setExpressionType(filterData.getExpressionType()); - subscriptionData.setTopic(filterData.getTopic()); - subscriptionData.setClassFilterMode(false); - subscriptionData.setSubString(filterData.getExpression()); - - GetMessageResult getMessageResult = master.getMessage(group, realTopic, QUEUE_ID, 0, 10000, - new ExpressionMessageFilter(subscriptionData, filterData, filterManager)); - String assertMsg = group + "-" + realTopic; - try { - assertThat(getMessageResult).isNotNull(); - assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus()); - assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty(); - assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); - - for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) { - MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false); - assertThat(messageExt).isNotNull(); - - Object evlRet = null; - try { - evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties())); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + try { + List msgs = putMsg(master, topicCount, msgPerTopic); + + Thread.sleep(100); + + for (int i = 0; i < topicCount; i++) { + String realTopic = TOPIC + i; + + for (int j = 0; j < msgPerTopic; j++) { + String group = "CID_" + j; + + ConsumerFilterData filterData = filterManager.get(realTopic, group); + assertThat(filterData).isNotNull(); + + List filteredMsgs = filtered(msgs, filterData); + + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setExpressionType(filterData.getExpressionType()); + subscriptionData.setTopic(filterData.getTopic()); + subscriptionData.setClassFilterMode(false); + subscriptionData.setSubString(filterData.getExpression()); + + GetMessageResult getMessageResult = master.getMessage(group, realTopic, QUEUE_ID, 0, 10000, + new ExpressionMessageFilter(subscriptionData, filterData, filterManager)); + String assertMsg = group + "-" + realTopic; + try { + assertThat(getMessageResult).isNotNull(); + assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus()); + assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty(); + assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + + for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) { + MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false); + assertThat(messageExt).isNotNull(); + + Object evlRet = null; + try { + evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties())); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } - assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE); + assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE); - // check - boolean find = false; - for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) { - if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) { - find = true; + // check + boolean find = false; + for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) { + if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) { + find = true; + } } + assertThat(find).isTrue(); } - assertThat(find).isTrue(); + } finally { + assert getMessageResult != null; + getMessageResult.release(); } - } finally { - getMessageResult.release(); } } + } finally { + destroy(master); } } @Test public void testGetMessage_withFilter_checkTagsCode() throws Exception { - putMsg(master, topicCount, msgPerTopic); + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); + DefaultMessageStore master = gen("testFilterCheckTagsCode", filterManager); - await().atMost(3, TimeUnit.SECONDS).untilAsserted(new ThrowingRunnable() { - @Override - public void run() throws Throwable { - for (int i = 0; i < topicCount; i++) { - final String realTopic = TOPIC + i; - GetMessageResult getMessageResult = master.getMessage("test", realTopic, QUEUE_ID, 0, 10000, - new MessageFilter() { - @Override - public boolean isMatchedByConsumeQueue(Long tagsCode, - ConsumeQueueExt.CqExtUnit cqExtUnit) { - if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) { - return false; - } - return true; - } + try { + putMsg(master, topicCount, msgPerTopic); - @Override - public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, - Map properties) { - return true; - } - }); - assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic); + await().atMost(3, TimeUnit.SECONDS).untilAsserted(new ThrowingRunnable() { + @Override + public void run() throws Throwable { + for (int i = 0; i < topicCount; i++) { + final String realTopic = TOPIC + i; + GetMessageResult getMessageResult = master.getMessage("test", realTopic, QUEUE_ID, 0, 10000, + new MessageFilter() { + @Override + public boolean isMatchedByConsumeQueue(Long tagsCode, + ConsumeQueueExt.CqExtUnit cqExtUnit) { + return tagsCode == null || tagsCode > ConsumeQueueExt.MAX_ADDR; + } + + @Override + public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, + Map properties) { + return true; + } + }); + assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic); + } } - } - }); + }); + } finally { + destroy(master); + } } } From 7ad5a7dd55f47665b019d6d8b6c2a67f1456719d Mon Sep 17 00:00:00 2001 From: Crazylychee Date: Thu, 21 Aug 2025 00:15:21 +0800 Subject: [PATCH 2/3] [ISSUE #9623] Fix Failing Tests In MessageStoreWithFilterTest --- .../filter/MessageStoreWithFilterTest.java | 359 +++++++++--------- 1 file changed, 190 insertions(+), 169 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 84bca916998..edd0c6740f4 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -17,17 +17,6 @@ package org.apache.rocketmq.broker.filter; -import java.io.File; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; @@ -48,11 +37,21 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.awaitility.core.ThrowingRunnable; -import org.junit.After; import org.junit.Assume; -import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -63,47 +62,26 @@ public class MessageStoreWithFilterTest { private static final String TOPIC = "topic"; private static final int QUEUE_ID = 0; - private static final String STORE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "unit_test_store"; + private static final String BASE_STORE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "unit_test_store"; private static final int COMMIT_LOG_FILE_SIZE = 1024 * 1024 * 256; private static final int CQ_FILE_SIZE = 300000 * 20; private static final int CQ_EXT_FILE_SIZE = 300000 * 128; private static SocketAddress bornHost; - private static SocketAddress storeHost; - private DefaultMessageStore master; - - private ConsumerFilterManager filterManager; - - private int topicCount = 3; - - private int msgPerTopic = 30; + private final int topicCount = 3; + private final int msgPerTopic = 30; static { try { storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); - } catch (UnknownHostException e) { + } catch (UnknownHostException ignored) { } try { bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); - } catch (UnknownHostException e) { - } - } - - @Before - public void init() throws Exception { - filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); - master = gen(filterManager); - } - - @After - public void destroy() { - if (master != null) { - master.shutdown(); - master.destroy(); + } catch (UnknownHostException ignored) { } - UtilAll.deleteFile(new File(STORE_PATH)); } public MessageExtBrokerInner buildMessage() { @@ -126,7 +104,7 @@ public MessageExtBrokerInner buildMessage() { return msg; } - public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, + public MessageStoreConfig buildStoreConfig(String storePath, int commitLogFileSize, int cqFileSize, boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMappedFileSizeCommitLog(commitLogFileSize); @@ -135,15 +113,18 @@ public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize messageStoreConfig.setMessageIndexEnable(false); messageStoreConfig.setEnableConsumeQueueExt(enableCqExt); - messageStoreConfig.setStorePathRootDir(STORE_PATH); - messageStoreConfig.setStorePathCommitLog(STORE_PATH + File.separator + "commitlog"); + messageStoreConfig.setStorePathRootDir(storePath); + messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog"); return messageStoreConfig; } - protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Exception { + protected DefaultMessageStore gen(String testName, ConsumerFilterManager filterManager) throws Exception { + String storePath = BASE_STORE_PATH + "_" + testName; + cleanStoreDirectory(storePath); + MessageStoreConfig messageStoreConfig = buildStoreConfig( - COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE + storePath, COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE ); BrokerConfig brokerConfig = new BrokerConfig(); @@ -152,15 +133,15 @@ protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Ex brokerConfig.setExpectConsumerNumUseFilter(64); DefaultMessageStore master = new DefaultMessageStore( - messageStoreConfig, - new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()), - new MessageArrivingListener() { - @Override - public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map properties) { + messageStoreConfig, + new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat()), + new MessageArrivingListener() { + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode, + long msgStoreTime, byte[] filterBitMap, Map properties) { + } } - } - , brokerConfig, new ConcurrentHashMap<>()); + , brokerConfig, new ConcurrentHashMap<>()); master.getDispatcherList().addFirst(new CommitLogDispatcher() { @Override @@ -184,6 +165,27 @@ public void dispatch(DispatchRequest request) { return master; } + private void cleanStoreDirectory(String storePath) { + File storeDir = new File(storePath); + if (storeDir.exists()) { + UtilAll.deleteFile(storeDir); + } + } + + public void destroy(DefaultMessageStore master) { + if (master != null) { + String storePath = master.getMessageStoreConfig().getStorePathRootDir(); + try { + master.shutdown(); + master.destroy(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + cleanStoreDirectory(storePath); + } + } + } + protected List putMsg(DefaultMessageStore master, int topicCount, int msgCountPerTopic) throws Exception { List msgs = new ArrayList<>(); @@ -232,157 +234,176 @@ protected List filtered(List msgs, @Test public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception { - List msgs = putMsg(master, topicCount, msgPerTopic); + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); + DefaultMessageStore master = gen("testFilterBitMapAndConsumerChanged", filterManager); - Thread.sleep(200); + try { + List msgs = putMsg(master, topicCount, msgPerTopic); - // reset consumer; - String topic = "topic" + 0; - String resetGroup = "CID_" + 2; - String normalGroup = "CID_" + 3; + Thread.sleep(200); - { - // reset CID_2@topic0 to get all messages. - SubscriptionData resetSubData = new SubscriptionData(); - resetSubData.setExpressionType(ExpressionType.SQL92); - resetSubData.setTopic(topic); - resetSubData.setClassFilterMode(false); - resetSubData.setSubString("a is not null OR a is null"); + // reset consumer; + String topic = "topic" + 0; + String resetGroup = "CID_" + 2; + String normalGroup = "CID_" + 3; - ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic, - resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(), - System.currentTimeMillis()); + { + // reset CID_2@topic0 to get all messages. + SubscriptionData resetSubData = new SubscriptionData(); + resetSubData.setExpressionType(ExpressionType.SQL92); + resetSubData.setTopic(topic); + resetSubData.setClassFilterMode(false); + resetSubData.setSubString("a is not null OR a is null"); - GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, QUEUE_ID, 0, 1000, - new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager)); + ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic, + resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(), + System.currentTimeMillis()); - try { - assertThat(resetGetResult).isNotNull(); + GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, QUEUE_ID, 0, 1000, + new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager)); + try { + assertThat(resetGetResult).isNotNull(); - List filteredMsgs = filtered(msgs, resetFilterData); + List filteredMsgs = filtered(msgs, resetFilterData); - assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); - } finally { - resetGetResult.release(); + assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + } finally { + assert resetGetResult != null; + resetGetResult.release(); + } } - } - - { - ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup); - assertThat(normalFilterData).isNotNull(); - assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis()); - SubscriptionData normalSubData = new SubscriptionData(); - normalSubData.setExpressionType(normalFilterData.getExpressionType()); - normalSubData.setTopic(topic); - normalSubData.setClassFilterMode(false); - normalSubData.setSubString(normalFilterData.getExpression()); + { + ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup); + assertThat(normalFilterData).isNotNull(); + assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis()); - List filteredMsgs = filtered(msgs, normalFilterData); + SubscriptionData normalSubData = new SubscriptionData(); + normalSubData.setExpressionType(normalFilterData.getExpressionType()); + normalSubData.setTopic(topic); + normalSubData.setClassFilterMode(false); + normalSubData.setSubString(normalFilterData.getExpression()); - GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, QUEUE_ID, 0, 1000, - new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager)); + List filteredMsgs = filtered(msgs, normalFilterData); - try { - assertThat(normalGetResult).isNotNull(); - assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); - } finally { - normalGetResult.release(); + GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, QUEUE_ID, 0, 1000, + new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager)); + try { + assertThat(normalGetResult).isNotNull(); + assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + } finally { + assert normalGetResult != null; + normalGetResult.release(); + } } + } finally { + destroy(master); } } @Test public void testGetMessage_withFilterBitMap() throws Exception { - List msgs = putMsg(master, topicCount, msgPerTopic); - - Thread.sleep(100); + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); + DefaultMessageStore master = gen("testFilterBitMap", filterManager); - for (int i = 0; i < topicCount; i++) { - String realTopic = TOPIC + i; - - for (int j = 0; j < msgPerTopic; j++) { - String group = "CID_" + j; - - ConsumerFilterData filterData = filterManager.get(realTopic, group); - assertThat(filterData).isNotNull(); - - List filteredMsgs = filtered(msgs, filterData); - - SubscriptionData subscriptionData = new SubscriptionData(); - subscriptionData.setExpressionType(filterData.getExpressionType()); - subscriptionData.setTopic(filterData.getTopic()); - subscriptionData.setClassFilterMode(false); - subscriptionData.setSubString(filterData.getExpression()); - - GetMessageResult getMessageResult = master.getMessage(group, realTopic, QUEUE_ID, 0, 10000, - new ExpressionMessageFilter(subscriptionData, filterData, filterManager)); - String assertMsg = group + "-" + realTopic; - try { - assertThat(getMessageResult).isNotNull(); - assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus()); - assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty(); - assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); - - for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) { - MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false); - assertThat(messageExt).isNotNull(); - - Object evlRet = null; - try { - evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties())); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + try { + List msgs = putMsg(master, topicCount, msgPerTopic); + + Thread.sleep(100); + + for (int i = 0; i < topicCount; i++) { + String realTopic = TOPIC + i; + + for (int j = 0; j < msgPerTopic; j++) { + String group = "CID_" + j; + + ConsumerFilterData filterData = filterManager.get(realTopic, group); + assertThat(filterData).isNotNull(); + + List filteredMsgs = filtered(msgs, filterData); + + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setExpressionType(filterData.getExpressionType()); + subscriptionData.setTopic(filterData.getTopic()); + subscriptionData.setClassFilterMode(false); + subscriptionData.setSubString(filterData.getExpression()); + + GetMessageResult getMessageResult = master.getMessage(group, realTopic, QUEUE_ID, 0, 10000, + new ExpressionMessageFilter(subscriptionData, filterData, filterManager)); + String assertMsg = group + "-" + realTopic; + try { + assertThat(getMessageResult).isNotNull(); + assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus()); + assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty(); + assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size()); + + for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) { + MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false); + assertThat(messageExt).isNotNull(); + + Object evlRet = null; + try { + evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties())); + } catch (Exception e) { + e.printStackTrace(); + assertThat(true).isFalse(); + } - assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE); + assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE); - // check - boolean find = false; - for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) { - if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) { - find = true; + // check + boolean find = false; + for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) { + if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) { + find = true; + } } + assertThat(find).isTrue(); } - assertThat(find).isTrue(); + } finally { + assert getMessageResult != null; + getMessageResult.release(); } - } finally { - getMessageResult.release(); } } + } finally { + destroy(master); } } @Test public void testGetMessage_withFilter_checkTagsCode() throws Exception { - putMsg(master, topicCount, msgPerTopic); + ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); + DefaultMessageStore master = gen("testFilterCheckTagsCode", filterManager); - await().atMost(3, TimeUnit.SECONDS).untilAsserted(new ThrowingRunnable() { - @Override - public void run() throws Throwable { - for (int i = 0; i < topicCount; i++) { - final String realTopic = TOPIC + i; - GetMessageResult getMessageResult = master.getMessage("test", realTopic, QUEUE_ID, 0, 10000, - new MessageFilter() { - @Override - public boolean isMatchedByConsumeQueue(Long tagsCode, - ConsumeQueueExt.CqExtUnit cqExtUnit) { - if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) { - return false; - } - return true; - } + try { + putMsg(master, topicCount, msgPerTopic); - @Override - public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, - Map properties) { - return true; - } - }); - assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic); + await().atMost(3, TimeUnit.SECONDS).untilAsserted(new ThrowingRunnable() { + @Override + public void run() throws Throwable { + for (int i = 0; i < topicCount; i++) { + final String realTopic = TOPIC + i; + GetMessageResult getMessageResult = master.getMessage("test", realTopic, QUEUE_ID, 0, 10000, + new MessageFilter() { + @Override + public boolean isMatchedByConsumeQueue(Long tagsCode, + ConsumeQueueExt.CqExtUnit cqExtUnit) { + return tagsCode == null || tagsCode > ConsumeQueueExt.MAX_ADDR; + } + + @Override + public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, + Map properties) { + return true; + } + }); + assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic); + } } - } - }); + }); + } finally { + destroy(master); + } } } From e6afed823f9cf462dc1bf5363a94d103573d6d83 Mon Sep 17 00:00:00 2001 From: Crazylychee Date: Sat, 30 Aug 2025 16:32:01 +0800 Subject: [PATCH 3/3] Fix test --- .../rocketmq/broker/filter/MessageStoreWithFilterTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index edd0c6740f4..9f2db037136 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -330,7 +330,6 @@ public void testGetMessage_withFilterBitMap() throws Exception { GetMessageResult getMessageResult = master.getMessage(group, realTopic, QUEUE_ID, 0, 10000, new ExpressionMessageFilter(subscriptionData, filterData, filterManager)); - String assertMsg = group + "-" + realTopic; try { assertThat(getMessageResult).isNotNull(); assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus()); @@ -381,7 +380,7 @@ public void testGetMessage_withFilter_checkTagsCode() throws Exception { await().atMost(3, TimeUnit.SECONDS).untilAsserted(new ThrowingRunnable() { @Override - public void run() throws Throwable { + public void run() { for (int i = 0; i < topicCount; i++) { final String realTopic = TOPIC + i; GetMessageResult getMessageResult = master.getMessage("test", realTopic, QUEUE_ID, 0, 10000,