Skip to content

Commit c7035cf

Browse files
TakaHiR07nikhil-ctds
authored andcommitted
[fix][broker] rackaware policy is ineffective when delete zk rack info after bkclient initialize (apache#20944)
1 parent 1bbfa8b commit c7035cf

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,6 @@ public synchronized void setConf(Configuration conf) {
121121
store.registerListener(this::handleUpdates);
122122
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
123123
.orElseGet(BookiesRackConfiguration::new);
124-
updateRacksWithHost(racksWithHost);
125-
watchAvailableBookies();
126124
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
127125
for (String address : bookieMapping.keySet()) {
128126
bookieAddressListLastTime.add(BookieId.parse(address));
@@ -132,6 +130,8 @@ public synchronized void setConf(Configuration conf) {
132130
bookieAddressListLastTime);
133131
}
134132
}
133+
updateRacksWithHost(racksWithHost);
134+
watchAvailableBookies();
135135
} catch (InterruptedException | ExecutionException | MetadataException e) {
136136
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
137137
}

pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ public void testWithPulsarRegistrationClient() throws Exception {
254254
bkClientConf.getTimeoutTimerNumTicks());
255255

256256
RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy();
257+
mapping.registerRackChangeListener(repp);
257258
Class<?> clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
258259
Field field1 = clazz1.getDeclaredField("knownBookies");
259260
field1.setAccessible(true);
@@ -323,6 +324,22 @@ public void testWithPulsarRegistrationClient() throws Exception {
323324
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1");
324325
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack");
325326

327+
//remove bookie2 rack, the bookie2 rack should be /default-rack
328+
data = "{\"group1\": {\"" + BOOKIE1
329+
+ "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}}}";
330+
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join();
331+
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1);
332+
333+
racks = mapping
334+
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
335+
.stream().filter(Objects::nonNull).toList();
336+
assertEquals(racks.size(), 1);
337+
assertEquals(racks.get(0), "/rack0");
338+
assertEquals(knownBookies.size(), 3);
339+
assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");
340+
assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/default-rack");
341+
assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack");
342+
326343
timer.stop();
327344
}
328345
}

0 commit comments

Comments
 (0)