From 8fa909e42a2b45e83d78aec93682f731756f3ea5 Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Thu, 23 Oct 2025 12:29:09 +0800 Subject: [PATCH 1/5] feat: use data version from master while sync slave and fix delete config while sync Change-Id: I42b2e7b1acc6836d3c90973801c9defba5f1325c --- .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 2e3134016c7..026683c3bc8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -94,13 +94,12 @@ private void syncTopicConfig() { Map.Entry entry = iterator.next(); if (!newTopicConfigTable.containsKey(entry.getKey())) { iterator.remove(); + topicConfigManager.deleteTopicConfig(entry.getKey()); } - topicConfigManager.deleteTopicConfig(entry.getKey()); } //update newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig); - topicConfigManager.updateDataVersion(); topicConfigManager.persist(); } if (topicWrapper.getTopicQueueMappingDetailMap() != null @@ -189,12 +188,11 @@ private void syncSubscriptionGroupConfig() { Map.Entry configEntry = iterator.next(); if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) { iterator.remove(); + subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); } - subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); } // update newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig); - subscriptionGroupManager.updateDataVersion(); // persist subscriptionGroupManager.persist(); LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak); From fddc5d8b837170d4fe913fd4a7e6c61138c1a116 Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Thu, 23 Oct 2025 14:48:46 +0800 Subject: [PATCH 2/5] fix: assign new version using master while sync slave Change-Id: I7ec20607a84499fe5a6607763013c59d726aedc3 --- .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 026683c3bc8..9d8a5958139 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -83,8 +83,6 @@ private void syncTopicConfig() { TopicConfigManager topicConfigManager = this.brokerController.getTopicConfigManager(); if (!topicConfigManager.getDataVersion().equals(topicWrapper.getDataVersion())) { - topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion()); - ConcurrentMap newTopicConfigTable = topicWrapper.getTopicConfigTable(); ConcurrentMap topicConfigTable = topicConfigManager.getTopicConfigTable(); @@ -100,6 +98,7 @@ private void syncTopicConfig() { //update newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig); + topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion()); topicConfigManager.persist(); } if (topicWrapper.getTopicQueueMappingDetailMap() != null @@ -176,7 +175,6 @@ private void syncSubscriptionGroupConfig() { if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() .equals(subscriptionWrapper.getDataVersion())) { SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager(); - subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion()); ConcurrentMap curSubscriptionGroupTable = subscriptionGroupManager.getSubscriptionGroupTable(); @@ -193,6 +191,7 @@ private void syncSubscriptionGroupConfig() { } // update newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig); + subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion()); // persist subscriptionGroupManager.persist(); LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak); From dbbfbc0936516e379be657ce81a42c8257c0caeb Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Thu, 23 Oct 2025 15:28:06 +0800 Subject: [PATCH 3/5] feat: allow set dataVersion directly for topic/group config sync Change-Id: Ic845794350e8bdaa847bdd0ae4b3e40ab1ad6311 --- .../apache/rocketmq/broker/RocksDBConfigManager.java | 5 +++++ .../config/v1/RocksDBSubscriptionGroupManager.java | 10 ++++++++++ .../broker/config/v1/RocksDBTopicConfigManager.java | 10 ++++++++++ .../rocketmq/broker/topic/TopicConfigManager.java | 4 +++- 4 files changed, 28 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java index ee2d4e54a6a..bcf4d1c8efd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java @@ -128,6 +128,11 @@ public void updateKvDataVersion() throws Exception { this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8)); } + public void setKvDataVersion(DataVersion dataVersion) throws Exception { + this.kvDataVersion = dataVersion; + this.configRocksDBStorage.updateKvDataVersion(JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8)); + } + public DataVersion getKvDataVersion() { return kvDataVersion; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java index f7e0de914d3..c0e1d4cb8b8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java @@ -207,6 +207,16 @@ public void updateDataVersion() { } } + @Override + public void setDataVersion(DataVersion dataVersion) { + try { + rocksDBConfigManager.setKvDataVersion(dataVersion); + } catch (Exception e) { + log.error("set group config dataVersion error", e); + throw new RuntimeException(e); + } + } + protected void decodeForbidden(byte[] key, byte[] body) { String forbiddenGroupName = new String(key, DataConverter.CHARSET_UTF8); JSONObject jsonObject = JSON.parseObject(new String(body, DataConverter.CHARSET_UTF8)); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java index d64f808067c..2936efc84aa 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java @@ -163,4 +163,14 @@ public void updateDataVersion() { throw new RuntimeException(e); } } + + @Override + public void setDataVersion(DataVersion dataVersion) { + try { + rocksDBConfigManager.setKvDataVersion(dataVersion); + } catch (Exception e) { + log.error("set group config dataVersion error", e); + throw new RuntimeException(e); + } + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index ed46dfdc49c..bf049bb4223 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -758,6 +758,8 @@ public void updateDataVersion() { dataVersion.nextVersion(stateMachineVersion); } - + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion.assignNewOne(dataVersion); + } } From 3040e8386ee9282aa8812aaa040facc3d89af78d Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Thu, 23 Oct 2025 15:29:40 +0800 Subject: [PATCH 4/5] feat: set data version directly while sync from master Change-Id: I39e78477a5223b578a4ede3e5cb76f04368d1ca3 --- .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 9d8a5958139..78f78216b5f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -98,7 +98,7 @@ private void syncTopicConfig() { //update newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig); - topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion()); + topicConfigManager.setDataVersion(topicWrapper.getDataVersion()); topicConfigManager.persist(); } if (topicWrapper.getTopicQueueMappingDetailMap() != null @@ -191,7 +191,7 @@ private void syncSubscriptionGroupConfig() { } // update newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig); - subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion()); + subscriptionGroupManager.setDataVersion(subscriptionWrapper.getDataVersion()); // persist subscriptionGroupManager.persist(); LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak); From 983802566ee30943659fb8116fc5023b271147de Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Thu, 23 Oct 2025 16:51:06 +0800 Subject: [PATCH 5/5] test: adjust slave sync test for version Change-Id: I9e835568912928ddf6e81816095ee3ed8f93afc0 --- .../broker/slave/SlaveSynchronizeTest.java | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java index c9461c42240..192448b8978 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java @@ -55,6 +55,8 @@ import java.util.concurrent.ConcurrentHashMap; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -113,13 +115,9 @@ public void init() { when(brokerController.getMessageStore()).thenReturn(messageStore); when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore); when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint); - when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion()); - when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>()); when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); when(consumerOffsetManager.getOffsetTable()).thenReturn(new ConcurrentHashMap<>()); when(consumerOffsetManager.getDataVersion()).thenReturn(new DataVersion()); - when(subscriptionGroupManager.getDataVersion()).thenReturn(new DataVersion()); - when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>()); when(queryAssignmentProcessor.getMessageRequestModeManager()).thenReturn(messageRequestModeManager); when(messageRequestModeManager.getMessageRequestModeMap()).thenReturn(new ConcurrentHashMap<>()); when(messageStoreConfig.isTimerWheelEnable()).thenReturn(true); @@ -136,17 +134,31 @@ public void init() { public void testSyncAll() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException { TopicConfig newTopicConfig = new TopicConfig("NewTopic"); - when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(newTopicConfig)); + TopicConfigAndMappingSerializeWrapper topicConfigWrapper = createTopicConfigWrapper(newTopicConfig); + when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicConfigWrapper); when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper()); when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn(""); - when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper()); + SubscriptionGroupWrapper subscriptionGroupWrapper = createSubscriptionGroupWrapper(); + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper); when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper()); when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper()); + + TopicConfigManager topicConfigManager = new TopicConfigManager(); + TopicConfigManager spiedTopicConfigManager = spy(topicConfigManager); + doNothing().when(spiedTopicConfigManager).persist(); + SubscriptionGroupManager groupConfigManager = new SubscriptionGroupManager(); + SubscriptionGroupManager spiedGroupConfigManager = spy(groupConfigManager); + doNothing().when(spiedGroupConfigManager).persist(); + when(brokerController.getTopicConfigManager()).thenReturn(spiedTopicConfigManager); + when(brokerController.getSubscriptionGroupManager()).thenReturn(spiedGroupConfigManager); + slaveSynchronize.syncAll(); - Assert.assertEquals(1, this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion()); - Assert.assertEquals(1, this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion()); + long topicVer = topicConfigWrapper.getDataVersion().getStateVersion(); + long groupVer = subscriptionGroupWrapper.getDataVersion().getStateVersion(); + Assert.assertEquals(topicVer, this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion()); + Assert.assertEquals(topicVer, this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion()); Assert.assertEquals(1, consumerOffsetManager.getDataVersion().getStateVersion()); - Assert.assertEquals(1, subscriptionGroupManager.getDataVersion().getStateVersion()); + Assert.assertEquals(groupVer, this.brokerController.getSubscriptionGroupManager().getDataVersion().getStateVersion()); Assert.assertEquals(1, timerMetrics.getDataVersion().getStateVersion()); } @@ -167,7 +179,7 @@ private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConf wrapper.setTopicConfigTable(new ConcurrentHashMap<>()); wrapper.getTopicConfigTable().put(topicConfig.getTopicName(), topicConfig); DataVersion dataVersion = new DataVersion(); - dataVersion.setStateVersion(1L); + dataVersion.setStateVersion(5L); wrapper.setDataVersion(dataVersion); wrapper.setMappingDataVersion(dataVersion); return wrapper; @@ -186,7 +198,7 @@ private SubscriptionGroupWrapper createSubscriptionGroupWrapper() { SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper(); wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>()); DataVersion dataVersion = new DataVersion(); - dataVersion.setStateVersion(1L); + dataVersion.setStateVersion(5L); wrapper.setDataVersion(dataVersion); return wrapper; }