Skip to content

Commit e60964b

Browse files
author
fanjianye
committed
fix rackaware policy is ineffective after delete zk rack info
1 parent 57fbee4 commit e60964b

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,6 @@ public synchronized void setConf(Configuration conf) {
121121
store.registerListener(this::handleUpdates);
122122
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
123123
.orElseGet(BookiesRackConfiguration::new);
124-
updateRacksWithHost(racksWithHost);
125-
watchAvailableBookies();
126124
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
127125
for (String address : bookieMapping.keySet()) {
128126
bookieAddressListLastTime.add(BookieId.parse(address));
@@ -132,6 +130,8 @@ public synchronized void setConf(Configuration conf) {
132130
bookieAddressListLastTime);
133131
}
134132
}
133+
updateRacksWithHost(racksWithHost);
134+
watchAvailableBookies();
135135
} catch (InterruptedException | ExecutionException | MetadataException e) {
136136
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
137137
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public Object[][] forceMinRackNumProvider() {
6666
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
6767
}
6868

69+
@DataProvider(name = "deleteRackFirstAfterBkClientInitialProvider")
70+
public Object[][] deleteRackFirstAfterBkClientInitialProvider() {
71+
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
72+
}
73+
6974
@Override
7075
protected void configurePulsar(ServiceConfiguration config) throws Exception {
7176
// Start bookies with specific racks
@@ -201,7 +206,8 @@ public void testPlacementMinRackNumsPerWriteQuorum(boolean forceMinRackNums) thr
201206
}
202207
}
203208

204-
public void testRackUpdate() throws Exception {
209+
@Test(dataProvider="deleteRackFirstAfterBkClientInitialProvider")
210+
public void testRackUpdate(boolean isDeleteRackFirst) throws Exception {
205211
// 1. reset configurations for rack-aware
206212
cleanup();
207213
config = new ServiceConfiguration();
@@ -297,6 +303,14 @@ public void testRackUpdate() throws Exception {
297303
lh.close();
298304
}
299305

306+
// rackInfo is deleted first after bkClient initialize
307+
if (isDeleteRackFirst) {
308+
pulsar.getManagedLedgerClientFactory().close();
309+
pulsar.getManagedLedgerClientFactory().initialize(pulsar.getConfig(),
310+
pulsar.getLocalMetadataStore(), pulsar.getBkClientFactory(), pulsar.getIoEventLoopGroup());
311+
bkc = pulsar.getManagedLedgerClientFactory().getBookKeeperClient();
312+
}
313+
300314
// 6. remove rack-0
301315
for (int i = 0; i < NUM_BOOKIES / 2; i++) {
302316
String bookie = servers.get(i).getServer().getLocalAddress().toString();

0 commit comments

Comments
 (0)