diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java index 4ebdce13157..f4a1b3562cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java @@ -163,6 +163,12 @@ public void updateKvDataVersion() throws Exception { JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8)); } + public void setKvDataVersion(DataVersion dataVersion) throws Exception { + this.kvDataVersion = dataVersion; + this.configRocksDBStorage.put(versionCF, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length, + JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8)); + } + public DataVersion getKvDataVersion() { return kvDataVersion; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java index f6ae3a3e598..6ebdec69d81 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java @@ -249,6 +249,16 @@ public void updateDataVersion() { } } + @Override + public void setDataVersion(DataVersion dataVersion) { + try { + rocksDBConfigManager.setKvDataVersion(dataVersion); + } catch (Exception e) { + log.error("set group config dataVersion error", e); + throw new RuntimeException(e); + } + } + protected void decodeForbidden(byte[] key, byte[] body) { String forbiddenGroupName = new String(key, RocksDBConfigManager.CHARSET); JSONObject jsonObject = JSON.parseObject(new String(body, RocksDBConfigManager.CHARSET)); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java index 4a8d124e9bf..23ddb2dbb27 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java @@ -204,6 +204,16 @@ public void updateDataVersion() { } } + @Override + public void setDataVersion(DataVersion dataVersion) { + try { + rocksDBConfigManager.setKvDataVersion(dataVersion); + } catch (Exception e) { + log.error("set topic config dataVersion error", e); + throw new RuntimeException(e); + } + } + /** * Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is * enabled. diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 2e3134016c7..78f78216b5f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -83,8 +83,6 @@ private void syncTopicConfig() { TopicConfigManager topicConfigManager = this.brokerController.getTopicConfigManager(); if (!topicConfigManager.getDataVersion().equals(topicWrapper.getDataVersion())) { - topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion()); - ConcurrentMap newTopicConfigTable = topicWrapper.getTopicConfigTable(); ConcurrentMap topicConfigTable = topicConfigManager.getTopicConfigTable(); @@ -94,13 +92,13 @@ private void syncTopicConfig() { Map.Entry entry = iterator.next(); if (!newTopicConfigTable.containsKey(entry.getKey())) { iterator.remove(); + topicConfigManager.deleteTopicConfig(entry.getKey()); } - topicConfigManager.deleteTopicConfig(entry.getKey()); } //update newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig); - topicConfigManager.updateDataVersion(); + topicConfigManager.setDataVersion(topicWrapper.getDataVersion()); topicConfigManager.persist(); } if (topicWrapper.getTopicQueueMappingDetailMap() != null @@ -177,7 +175,6 @@ private void syncSubscriptionGroupConfig() { if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() .equals(subscriptionWrapper.getDataVersion())) { SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager(); - subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion()); ConcurrentMap curSubscriptionGroupTable = subscriptionGroupManager.getSubscriptionGroupTable(); @@ -189,12 +186,12 @@ private void syncSubscriptionGroupConfig() { Map.Entry configEntry = iterator.next(); if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) { iterator.remove(); + subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); } - subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); } // update newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig); - subscriptionGroupManager.updateDataVersion(); + subscriptionGroupManager.setDataVersion(subscriptionWrapper.getDataVersion()); // persist subscriptionGroupManager.persist(); LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index f59651fc8dd..162a89fdc95 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -758,6 +758,8 @@ public void updateDataVersion() { dataVersion.nextVersion(stateMachineVersion); } - + public void setDataVersion(DataVersion dataVersion) { + this.dataVersion.assignNewOne(dataVersion); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java index c9461c42240..192448b8978 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java @@ -55,6 +55,8 @@ import java.util.concurrent.ConcurrentHashMap; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -113,13 +115,9 @@ public void init() { when(brokerController.getMessageStore()).thenReturn(messageStore); when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore); when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint); - when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion()); - when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>()); when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); when(consumerOffsetManager.getOffsetTable()).thenReturn(new ConcurrentHashMap<>()); when(consumerOffsetManager.getDataVersion()).thenReturn(new DataVersion()); - when(subscriptionGroupManager.getDataVersion()).thenReturn(new DataVersion()); - when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>()); when(queryAssignmentProcessor.getMessageRequestModeManager()).thenReturn(messageRequestModeManager); when(messageRequestModeManager.getMessageRequestModeMap()).thenReturn(new ConcurrentHashMap<>()); when(messageStoreConfig.isTimerWheelEnable()).thenReturn(true); @@ -136,17 +134,31 @@ public void init() { public void testSyncAll() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException { TopicConfig newTopicConfig = new TopicConfig("NewTopic"); - when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(newTopicConfig)); + TopicConfigAndMappingSerializeWrapper topicConfigWrapper = createTopicConfigWrapper(newTopicConfig); + when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicConfigWrapper); when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper()); when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn(""); - when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper()); + SubscriptionGroupWrapper subscriptionGroupWrapper = createSubscriptionGroupWrapper(); + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper); when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper()); when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper()); + + TopicConfigManager topicConfigManager = new TopicConfigManager(); + TopicConfigManager spiedTopicConfigManager = spy(topicConfigManager); + doNothing().when(spiedTopicConfigManager).persist(); + SubscriptionGroupManager groupConfigManager = new SubscriptionGroupManager(); + SubscriptionGroupManager spiedGroupConfigManager = spy(groupConfigManager); + doNothing().when(spiedGroupConfigManager).persist(); + when(brokerController.getTopicConfigManager()).thenReturn(spiedTopicConfigManager); + when(brokerController.getSubscriptionGroupManager()).thenReturn(spiedGroupConfigManager); + slaveSynchronize.syncAll(); - Assert.assertEquals(1, this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion()); - Assert.assertEquals(1, this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion()); + long topicVer = topicConfigWrapper.getDataVersion().getStateVersion(); + long groupVer = subscriptionGroupWrapper.getDataVersion().getStateVersion(); + Assert.assertEquals(topicVer, this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion()); + Assert.assertEquals(topicVer, this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion()); Assert.assertEquals(1, consumerOffsetManager.getDataVersion().getStateVersion()); - Assert.assertEquals(1, subscriptionGroupManager.getDataVersion().getStateVersion()); + Assert.assertEquals(groupVer, this.brokerController.getSubscriptionGroupManager().getDataVersion().getStateVersion()); Assert.assertEquals(1, timerMetrics.getDataVersion().getStateVersion()); } @@ -167,7 +179,7 @@ private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConf wrapper.setTopicConfigTable(new ConcurrentHashMap<>()); wrapper.getTopicConfigTable().put(topicConfig.getTopicName(), topicConfig); DataVersion dataVersion = new DataVersion(); - dataVersion.setStateVersion(1L); + dataVersion.setStateVersion(5L); wrapper.setDataVersion(dataVersion); wrapper.setMappingDataVersion(dataVersion); return wrapper; @@ -186,7 +198,7 @@ private SubscriptionGroupWrapper createSubscriptionGroupWrapper() { SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper(); wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>()); DataVersion dataVersion = new DataVersion(); - dataVersion.setStateVersion(1L); + dataVersion.setStateVersion(5L); wrapper.setDataVersion(dataVersion); return wrapper; }