Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ public void updateKvDataVersion() throws Exception {
JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
}

public void setKvDataVersion(DataVersion dataVersion) throws Exception {
this.kvDataVersion = dataVersion;
this.configRocksDBStorage.put(versionCF, KV_DATA_VERSION_KEY, KV_DATA_VERSION_KEY.length,
JSON.toJSONString(kvDataVersion).getBytes(StandardCharsets.UTF_8));
}

public DataVersion getKvDataVersion() {
return kvDataVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ public void updateDataVersion() {
}
}

@Override
public void setDataVersion(DataVersion dataVersion) {
try {
rocksDBConfigManager.setKvDataVersion(dataVersion);
} catch (Exception e) {
log.error("set group config dataVersion error", e);
throw new RuntimeException(e);
}
}

protected void decodeForbidden(byte[] key, byte[] body) {
String forbiddenGroupName = new String(key, RocksDBConfigManager.CHARSET);
JSONObject jsonObject = JSON.parseObject(new String(body, RocksDBConfigManager.CHARSET));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ public void updateDataVersion() {
}
}

@Override
public void setDataVersion(DataVersion dataVersion) {
try {
rocksDBConfigManager.setKvDataVersion(dataVersion);
} catch (Exception e) {
log.error("set topic config dataVersion error", e);
throw new RuntimeException(e);
}
}

/**
* Migrate data from separate RocksDB instances to the unified RocksDB when useSingleRocksDBForAllConfigs is
* enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ private void syncTopicConfig() {
TopicConfigManager topicConfigManager = this.brokerController.getTopicConfigManager();
if (!topicConfigManager.getDataVersion().equals(topicWrapper.getDataVersion())) {

topicConfigManager.getDataVersion().assignNewOne(topicWrapper.getDataVersion());

ConcurrentMap<String, TopicConfig> newTopicConfigTable = topicWrapper.getTopicConfigTable();
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigManager.getTopicConfigTable();

Expand All @@ -94,13 +92,13 @@ private void syncTopicConfig() {
Map.Entry<String, TopicConfig> entry = iterator.next();
if (!newTopicConfigTable.containsKey(entry.getKey())) {
iterator.remove();
topicConfigManager.deleteTopicConfig(entry.getKey());
}
topicConfigManager.deleteTopicConfig(entry.getKey());
}

//update
newTopicConfigTable.values().forEach(topicConfigManager::putTopicConfig);
topicConfigManager.updateDataVersion();
topicConfigManager.setDataVersion(topicWrapper.getDataVersion());
topicConfigManager.persist();
}
if (topicWrapper.getTopicQueueMappingDetailMap() != null
Expand Down Expand Up @@ -177,7 +175,6 @@ private void syncSubscriptionGroupConfig() {
if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
.equals(subscriptionWrapper.getDataVersion())) {
SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());

ConcurrentMap<String, SubscriptionGroupConfig> curSubscriptionGroupTable =
subscriptionGroupManager.getSubscriptionGroupTable();
Expand All @@ -189,12 +186,12 @@ private void syncSubscriptionGroupConfig() {
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) {
iterator.remove();
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
}
subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey());
}
// update
newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig);
subscriptionGroupManager.updateDataVersion();
subscriptionGroupManager.setDataVersion(subscriptionWrapper.getDataVersion());
// persist
subscriptionGroupManager.persist();
LOGGER.info("Update slave Subscription Group from master, {}", masterAddrBak);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,8 @@ public void updateDataVersion() {
dataVersion.nextVersion(stateMachineVersion);
}


public void setDataVersion(DataVersion dataVersion) {
this.dataVersion.assignNewOne(dataVersion);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.concurrent.ConcurrentHashMap;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -113,13 +115,9 @@ public void init() {
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint);
when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion());
when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>());
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(consumerOffsetManager.getOffsetTable()).thenReturn(new ConcurrentHashMap<>());
when(consumerOffsetManager.getDataVersion()).thenReturn(new DataVersion());
when(subscriptionGroupManager.getDataVersion()).thenReturn(new DataVersion());
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>());
when(queryAssignmentProcessor.getMessageRequestModeManager()).thenReturn(messageRequestModeManager);
when(messageRequestModeManager.getMessageRequestModeMap()).thenReturn(new ConcurrentHashMap<>());
when(messageStoreConfig.isTimerWheelEnable()).thenReturn(true);
Expand All @@ -136,17 +134,31 @@ public void init() {
public void testSyncAll() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException {
TopicConfig newTopicConfig = new TopicConfig("NewTopic");
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(newTopicConfig));
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = createTopicConfigWrapper(newTopicConfig);
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(topicConfigWrapper);
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
SubscriptionGroupWrapper subscriptionGroupWrapper = createSubscriptionGroupWrapper();
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());

TopicConfigManager topicConfigManager = new TopicConfigManager();
TopicConfigManager spiedTopicConfigManager = spy(topicConfigManager);
doNothing().when(spiedTopicConfigManager).persist();
SubscriptionGroupManager groupConfigManager = new SubscriptionGroupManager();
SubscriptionGroupManager spiedGroupConfigManager = spy(groupConfigManager);
doNothing().when(spiedGroupConfigManager).persist();
when(brokerController.getTopicConfigManager()).thenReturn(spiedTopicConfigManager);
when(brokerController.getSubscriptionGroupManager()).thenReturn(spiedGroupConfigManager);

slaveSynchronize.syncAll();
Assert.assertEquals(1, this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion());
Assert.assertEquals(1, this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion());
long topicVer = topicConfigWrapper.getDataVersion().getStateVersion();
long groupVer = subscriptionGroupWrapper.getDataVersion().getStateVersion();
Assert.assertEquals(topicVer, this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion());
Assert.assertEquals(topicVer, this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion());
Assert.assertEquals(1, consumerOffsetManager.getDataVersion().getStateVersion());
Assert.assertEquals(1, subscriptionGroupManager.getDataVersion().getStateVersion());
Assert.assertEquals(groupVer, this.brokerController.getSubscriptionGroupManager().getDataVersion().getStateVersion());
Assert.assertEquals(1, timerMetrics.getDataVersion().getStateVersion());
}

Expand All @@ -167,7 +179,7 @@ private TopicConfigAndMappingSerializeWrapper createTopicConfigWrapper(TopicConf
wrapper.setTopicConfigTable(new ConcurrentHashMap<>());
wrapper.getTopicConfigTable().put(topicConfig.getTopicName(), topicConfig);
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
dataVersion.setStateVersion(5L);
wrapper.setDataVersion(dataVersion);
wrapper.setMappingDataVersion(dataVersion);
return wrapper;
Expand All @@ -186,7 +198,7 @@ private SubscriptionGroupWrapper createSubscriptionGroupWrapper() {
SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>());
DataVersion dataVersion = new DataVersion();
dataVersion.setStateVersion(1L);
dataVersion.setStateVersion(5L);
wrapper.setDataVersion(dataVersion);
return wrapper;
}
Expand Down
Loading