Skip to content

Commit 2cc899f

Browse files
authored
[ISSUE #5663] Fix Messages may be lost when SyncStateSet expand in extreme scenarios (#5798)
* Add a new state 'isSynchronizingSyncStateSet' to Solve the problem of missing messages * pass checkstyle * Using readWriteLock to replace synchronized in AutoSwitchHAService * Fix lock issue * Remove unnecessary 'remoteSyncStateSet.clear' * optimize import
1 parent 47355d1 commit 2cc899f

File tree

2 files changed

+144
-55
lines changed

2 files changed

+144
-55
lines changed

store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java

Lines changed: 101 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,6 @@
1717

1818
package org.apache.rocketmq.store.ha.autoswitch;
1919

20-
import java.io.IOException;
21-
import java.nio.channels.SocketChannel;
22-
import java.util.ArrayList;
23-
import java.util.HashSet;
24-
import java.util.List;
25-
import java.util.Map;
26-
import java.util.Set;
27-
import java.util.concurrent.ConcurrentHashMap;
28-
import java.util.concurrent.CopyOnWriteArraySet;
29-
import java.util.concurrent.ExecutorService;
30-
import java.util.concurrent.Executors;
31-
import java.util.concurrent.locks.Lock;
32-
import java.util.concurrent.locks.ReadWriteLock;
33-
import java.util.concurrent.locks.ReentrantReadWriteLock;
34-
import java.util.function.Consumer;
3520
import org.apache.rocketmq.common.ThreadFactoryImpl;
3621
import org.apache.rocketmq.common.constant.LoggerName;
3722
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
@@ -50,24 +35,44 @@
5035
import org.apache.rocketmq.store.ha.HAConnection;
5136
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
5237

38+
import java.io.IOException;
39+
import java.nio.channels.SocketChannel;
40+
import java.util.ArrayList;
41+
import java.util.HashSet;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.Set;
45+
import java.util.concurrent.ConcurrentHashMap;
46+
import java.util.concurrent.ExecutorService;
47+
import java.util.concurrent.Executors;
48+
import java.util.concurrent.locks.Lock;
49+
import java.util.concurrent.locks.ReadWriteLock;
50+
import java.util.concurrent.locks.ReentrantReadWriteLock;
51+
import java.util.function.Consumer;
52+
5353
/**
5454
* SwitchAble ha service, support switch role to master or slave.
5555
*/
5656
public class AutoSwitchHAService extends DefaultHAService {
5757
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
5858
private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
59-
private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>();
60-
private final CopyOnWriteArraySet<String> syncStateSet = new CopyOnWriteArraySet<>();
6159
private final ConcurrentHashMap<String, Long> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
60+
private final List<Consumer<Set<String>>> syncStateSetChangedListeners = new ArrayList<>();
61+
private final Set<String> syncStateSet = new HashSet<>();
62+
private final Set<String> remoteSyncStateSet = new HashSet<>();
63+
private final ReadWriteLock syncStateSetReadWriteLock = new ReentrantReadWriteLock();
64+
private final Lock readLock = syncStateSetReadWriteLock.readLock();
65+
private final Lock writeLock = syncStateSetReadWriteLock.writeLock();
66+
67+
// Indicate whether the syncStateSet is currently in the process of being synchronized to controller.
68+
private volatile boolean isSynchronizingSyncStateSet = false;
6269
private volatile long confirmOffset = -1;
6370

6471
private String localAddress;
6572

6673
private EpochFileCache epochCache;
6774
private AutoSwitchHAClient haClient;
6875

69-
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
70-
7176
public AutoSwitchHAService() {
7277
}
7378

@@ -93,10 +98,11 @@ public void shutdown() {
9398
@Override
9499
public void removeConnection(HAConnection conn) {
95100
if (!defaultMessageStore.isShutdown()) {
96-
final Set<String> syncStateSet = getSyncStateSet();
101+
final Set<String> syncStateSet = getLocalSyncStateSet();
97102
String slave = ((AutoSwitchHAConnection) conn).getSlaveAddress();
98103
if (syncStateSet.contains(slave)) {
99104
syncStateSet.remove(slave);
105+
markSynchronizingSyncStateSet(syncStateSet);
100106
notifySyncStateSetChanged(syncStateSet);
101107
}
102108
}
@@ -232,17 +238,22 @@ public void notifySyncStateSetChanged(final Set<String> newSyncStateSet) {
232238
* A slave will be removed from inSyncStateSet if (curTime - HaConnection.lastCaughtUpTime) > option(haMaxTimeSlaveNotCatchup)
233239
*/
234240
public Set<String> maybeShrinkInSyncStateSet() {
235-
final Set<String> newSyncStateSet = getSyncStateSet();
241+
final Set<String> newSyncStateSet = getLocalSyncStateSet();
242+
boolean isSyncStateSetChanged = false;
236243
final long haMaxTimeSlaveNotCatchup = this.defaultMessageStore.getMessageStoreConfig().getHaMaxTimeSlaveNotCatchup();
237244
for (Map.Entry<String, Long> next : this.connectionCaughtUpTimeTable.entrySet()) {
238245
final String slaveAddress = next.getKey();
239246
if (newSyncStateSet.contains(slaveAddress)) {
240247
final Long lastCaughtUpTimeMs = this.connectionCaughtUpTimeTable.get(slaveAddress);
241248
if ((System.currentTimeMillis() - lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchup) {
242249
newSyncStateSet.remove(slaveAddress);
250+
isSyncStateSetChanged = true;
243251
}
244252
}
245253
}
254+
if (isSyncStateSetChanged) {
255+
markSynchronizingSyncStateSet(newSyncStateSet);
256+
}
246257
return newSyncStateSet;
247258
}
248259

@@ -251,7 +262,7 @@ public Set<String> maybeShrinkInSyncStateSet() {
251262
* current confirmOffset, and it is caught up to an offset within the current leader epoch.
252263
*/
253264
public void maybeExpandInSyncStateSet(final String slaveAddress, final long slaveMaxOffset) {
254-
final Set<String> currentSyncStateSet = getSyncStateSet();
265+
final Set<String> currentSyncStateSet = getLocalSyncStateSet();
255266
if (currentSyncStateSet.contains(slaveAddress)) {
256267
return;
257268
}
@@ -260,12 +271,33 @@ public void maybeExpandInSyncStateSet(final String slaveAddress, final long slav
260271
final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
261272
if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
262273
currentSyncStateSet.add(slaveAddress);
274+
markSynchronizingSyncStateSet(currentSyncStateSet);
263275
// Notify the upper layer that syncStateSet changed.
264276
notifySyncStateSetChanged(currentSyncStateSet);
265277
}
266278
}
267279
}
268280

281+
private void markSynchronizingSyncStateSet(final Set<String> newSyncStateSet) {
282+
this.writeLock.lock();
283+
try {
284+
this.isSynchronizingSyncStateSet = true;
285+
this.remoteSyncStateSet.clear();
286+
this.remoteSyncStateSet.addAll(newSyncStateSet);
287+
} finally {
288+
this.writeLock.unlock();
289+
}
290+
}
291+
292+
private void markSynchronizingSyncStateSetDone() {
293+
// No need to lock, because the upper-level calling method has already locked write lock
294+
this.isSynchronizingSyncStateSet = false;
295+
}
296+
297+
public boolean isSynchronizingSyncStateSet() {
298+
return isSynchronizingSyncStateSet;
299+
}
300+
269301
public void updateConnectionLastCaughtUpTime(final String slaveAddress, final long lastCaughtUpTimeMs) {
270302
Long prevTime = ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, slaveAddress, k -> 0L);
271303
this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime, lastCaughtUpTimeMs));
@@ -276,7 +308,7 @@ public void updateConnectionLastCaughtUpTime(final String slaveAddress, final lo
276308
*/
277309
public long getConfirmOffset() {
278310
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
279-
if (this.syncStateSet.size() == 1) {
311+
if (getLocalSyncStateSet().size() == 1) {
280312
return this.defaultMessageStore.getMaxPhyOffset();
281313
}
282314
// First time compute confirmOffset.
@@ -288,19 +320,27 @@ public long getConfirmOffset() {
288320
}
289321

290322
public void updateConfirmOffsetWhenSlaveAck(final String slaveAddress) {
291-
if (this.syncStateSet.contains(slaveAddress)) {
292-
this.confirmOffset = computeConfirmOffset();
323+
this.readLock.lock();
324+
try {
325+
if (this.syncStateSet.contains(slaveAddress)) {
326+
this.confirmOffset = computeConfirmOffset();
327+
}
328+
} finally {
329+
this.readLock.unlock();
293330
}
294331
}
295332

296333
@Override
297334
public int inSyncReplicasNums(final long masterPutWhere) {
298-
final Lock readLock = readWriteLock.readLock();
335+
this.readLock.lock();
299336
try {
300-
readLock.lock();
301-
return syncStateSet.size();
337+
if (this.isSynchronizingSyncStateSet) {
338+
return Math.max(this.syncStateSet.size(), this.remoteSyncStateSet.size());
339+
} else {
340+
return this.syncStateSet.size();
341+
}
302342
} finally {
303-
readLock.unlock();
343+
this.readLock.unlock();
304344
}
305345
}
306346

@@ -322,6 +362,7 @@ public HARuntimeInfo getRuntimeInfo(long masterPutWhere) {
322362

323363
info.setMasterCommitLogMaxOffset(masterPutWhere);
324364

365+
Set<String> localSyncStateSet = getLocalSyncStateSet();
325366
for (HAConnection conn : this.connectionList) {
326367
HARuntimeInfo.HAConnectionRuntimeInfo cInfo = new HARuntimeInfo.HAConnectionRuntimeInfo();
327368

@@ -332,11 +373,11 @@ public HARuntimeInfo getRuntimeInfo(long masterPutWhere) {
332373
cInfo.setTransferredByteInSecond(conn.getTransferredByteInSecond());
333374
cInfo.setTransferFromWhere(conn.getTransferFromWhere());
334375

335-
cInfo.setInSync(syncStateSet.contains(((AutoSwitchHAConnection) conn).getSlaveAddress()));
376+
cInfo.setInSync(localSyncStateSet.contains(((AutoSwitchHAConnection) conn).getSlaveAddress()));
336377

337378
info.getHaConnectionInfo().add(cInfo);
338379
}
339-
info.setInSyncSlaveNums(syncStateSet.size() - 1);
380+
info.setInSyncSlaveNums(localSyncStateSet.size() - 1);
340381
}
341382
return info;
342383
}
@@ -358,26 +399,46 @@ private long computeConfirmOffset() {
358399
}
359400

360401
public void setSyncStateSet(final Set<String> syncStateSet) {
361-
final Lock writeLock = readWriteLock.writeLock();
402+
this.writeLock.lock();
362403
try {
363-
writeLock.lock();
404+
markSynchronizingSyncStateSetDone();
364405
this.syncStateSet.clear();
365406
this.syncStateSet.addAll(syncStateSet);
366407
this.confirmOffset = computeConfirmOffset();
367408
} finally {
368-
writeLock.unlock();
409+
this.writeLock.unlock();
369410
}
370411
}
371412

413+
/**
414+
* Return the union of the local and remote syncStateSets
415+
*/
372416
public Set<String> getSyncStateSet() {
373-
final Lock readLock = readWriteLock.readLock();
417+
this.readLock.lock();
418+
try {
419+
if (this.isSynchronizingSyncStateSet) {
420+
Set<String> unionSyncStateSet = new HashSet<>(this.syncStateSet.size() + this.remoteSyncStateSet.size());
421+
unionSyncStateSet.addAll(this.syncStateSet);
422+
unionSyncStateSet.addAll(this.remoteSyncStateSet);
423+
return unionSyncStateSet;
424+
} else {
425+
HashSet<String> syncStateSet = new HashSet<>(this.syncStateSet.size());
426+
syncStateSet.addAll(this.syncStateSet);
427+
return syncStateSet;
428+
}
429+
} finally {
430+
this.readLock.unlock();
431+
}
432+
}
433+
434+
public Set<String> getLocalSyncStateSet() {
435+
this.readLock.lock();
374436
try {
375-
readLock.lock();
376-
HashSet<String> set = new HashSet<>(this.syncStateSet.size());
377-
set.addAll(this.syncStateSet);
378-
return set;
437+
HashSet<String> localSyncStateSet = new HashSet<>(this.syncStateSet.size());
438+
localSyncStateSet.addAll(this.syncStateSet);
439+
return localSyncStateSet;
379440
} finally {
380-
readLock.unlock();
441+
this.readLock.unlock();
381442
}
382443
}
383444

store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,9 @@
1717

1818
package org.apache.rocketmq.store.ha.autoswitch;
1919

20-
import java.io.File;
21-
import java.net.InetAddress;
22-
import java.net.InetSocketAddress;
23-
import java.net.SocketAddress;
24-
import java.util.Arrays;
25-
import java.util.Collections;
26-
import java.util.HashSet;
27-
import java.util.Random;
28-
import java.util.Set;
29-
import java.util.UUID;
30-
import java.util.concurrent.TimeUnit;
31-
import java.util.concurrent.atomic.AtomicInteger;
32-
import java.util.concurrent.atomic.AtomicReference;
3320
import org.apache.commons.lang3.StringUtils;
3421
import org.apache.rocketmq.common.BrokerConfig;
22+
import org.apache.rocketmq.common.MixAll;
3523
import org.apache.rocketmq.common.UtilAll;
3624
import org.apache.rocketmq.common.message.MessageDecoder;
3725
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -46,11 +34,25 @@
4634
import org.apache.rocketmq.store.config.MessageStoreConfig;
4735
import org.apache.rocketmq.store.logfile.MappedFile;
4836
import org.apache.rocketmq.store.stats.BrokerStatsManager;
49-
import org.apache.rocketmq.common.MixAll;
5037
import org.junit.After;
38+
import org.junit.Assert;
39+
import org.junit.Assume;
5140
import org.junit.Ignore;
5241
import org.junit.Test;
53-
import org.junit.Assume;
42+
43+
import java.io.File;
44+
import java.net.InetAddress;
45+
import java.net.InetSocketAddress;
46+
import java.net.SocketAddress;
47+
import java.util.Arrays;
48+
import java.util.Collections;
49+
import java.util.HashSet;
50+
import java.util.Random;
51+
import java.util.Set;
52+
import java.util.UUID;
53+
import java.util.concurrent.TimeUnit;
54+
import java.util.concurrent.atomic.AtomicInteger;
55+
import java.util.concurrent.atomic.AtomicReference;
5456

5557
import static org.awaitility.Awaitility.await;
5658
import static org.junit.Assert.assertEquals;
@@ -446,6 +448,32 @@ public void testAddBrokerAndSyncFromLastFile() throws Exception {
446448
checkMessage(messageStore3, 10, 10);
447449
}
448450

451+
@Test
452+
public void testCheckSynchronizingSyncStateSetFlag() throws Exception {
453+
// Step1: broker1 as leader, broker2 as follower
454+
init(defaultMappedFileSize);
455+
((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList("127.0.0.1:8000")));
456+
457+
changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
458+
checkMessage(this.messageStore2, 10, 0);
459+
AutoSwitchHAService masterHAService = (AutoSwitchHAService) this.messageStore1.getHaService();
460+
461+
// Step2: check flag SynchronizingSyncStateSet
462+
Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
463+
Assert.assertEquals(masterHAService.getConfirmOffset(), 1570);
464+
Set<String> syncStateSet = masterHAService.getSyncStateSet();
465+
Assert.assertEquals(syncStateSet.size(), 2);
466+
Assert.assertTrue(syncStateSet.contains("127.0.0.1:8001"));
467+
468+
// Step3: set new syncStateSet
469+
HashSet<String> newSyncStateSet = new HashSet<String>() {{
470+
add("127.0.0.1:8000");
471+
add("127.0.0.1:8001");
472+
}};
473+
masterHAService.setSyncStateSet(newSyncStateSet);
474+
Assert.assertFalse(masterHAService.isSynchronizingSyncStateSet());
475+
}
476+
449477
@After
450478
public void destroy() throws Exception {
451479
if (this.messageStore2 != null) {

0 commit comments

Comments
 (0)