Skip to content

Commit 1bb2168

Browse files
authored
Fix controller mode HAService removeConnection (#9897)
* fix controller mode HAService removeConnection * fix store test
1 parent cbfd37b commit 1bb2168

File tree

2 files changed

+26
-10
lines changed

2 files changed

+26
-10
lines changed

store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,19 @@ public void shutdown() {
100100
@Override
101101
public void removeConnection(HAConnection conn) {
102102
if (!defaultMessageStore.isShutdown()) {
103-
final Set<Long> syncStateSet = getLocalSyncStateSet();
104103
Long slave = ((AutoSwitchHAConnection) conn).getSlaveId();
105-
if (syncStateSet.contains(slave)) {
106-
syncStateSet.remove(slave);
107-
markSynchronizingSyncStateSet(syncStateSet);
108-
notifySyncStateSetChanged(syncStateSet);
104+
this.writeLock.lock();
105+
try {
106+
final Set<Long> newSyncStateSet = new HashSet<>(this.syncStateSet);
107+
if (newSyncStateSet.contains(slave)) {
108+
newSyncStateSet.remove(slave);
109+
markSynchronizingSyncStateSet(newSyncStateSet);
110+
notifySyncStateSetChanged(newSyncStateSet);
111+
this.syncStateSet.clear();
112+
this.syncStateSet.addAll(newSyncStateSet);
113+
}
114+
} finally {
115+
this.writeLock.unlock();
109116
}
110117
}
111118
super.removeConnection(conn);

store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,12 @@ public void testConfirmOffset() throws Exception {
221221
// Step2, shutdown store2
222222
this.messageStore2.shutdown();
223223

224-
// Put message, which should put failed.
224+
// Put message, which should succeed because slave is removed from syncStateSet, only master remains
225225
final PutMessageResult putMessageResult = this.messageStore1.putMessage(buildMessage());
226-
assertEquals(putMessageResult.getPutMessageStatus(), PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
226+
assertEquals(PutMessageStatus.PUT_OK,putMessageResult.getPutMessageStatus());
227227

228-
// The confirmOffset still don't change, because syncStateSet contains broker2, but broker2 shutdown
229-
assertEquals(confirmOffset, this.messageStore1.getConfirmOffset());
228+
// The confirmOffset should update because syncStateSet only contains master after slave shutdown
229+
assertTrue(this.messageStore1.getConfirmOffset() >= confirmOffset);
230230

231231
// Step3, shutdown store1, start store2, change store2 to master, epoch = 2
232232
this.messageStore1.shutdown();
@@ -296,10 +296,19 @@ public void testOptionAllAckInSyncStateSet() throws Exception {
296296
this.messageStore2.shutdown();
297297
this.messageStore2.destroy();
298298

299+
// Wait for connection to be removed and syncStateSet to be updated by removeConnection
300+
await().atMost(10, TimeUnit.SECONDS).until(() -> {
301+
AutoSwitchHAService haService = (AutoSwitchHAService) this.messageStore1.getHaService();
302+
return haService.getConnectionCount().get() == 0
303+
&& haService.getLocalSyncStateSet().size() == 1;
304+
});
305+
306+
// Now manually set syncStateSet back to {1, 2} to test the scenario where
307+
// syncStateSet contains a disconnected slave
299308
((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(result);
300309

301310
final PutMessageResult putMessageResult = this.messageStore1.putMessage(buildMessage());
302-
assertEquals(putMessageResult.getPutMessageStatus(), PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
311+
assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT,putMessageResult.getPutMessageStatus());
303312
}
304313

305314
@Ignore

0 commit comments

Comments
 (0)