Skip to content

Commit f756e5e

Browse files
committed
fix a race condition
1 parent f87d948 commit f756e5e

File tree

3 files changed

+113
-7
lines changed

3 files changed

+113
-7
lines changed

meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import java.util.Map;
2929
import java.util.Set;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
3032
import org.apache.commons.lang3.tuple.ImmutablePair;
3133
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
3234
import org.apache.helix.metaclient.api.DataChangeListener;
@@ -40,6 +42,7 @@
4042
import org.apache.helix.metaclient.factories.MetaClientConfig;
4143
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
4244
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
45+
import org.apache.helix.zookeeper.zkclient.ZkClient;
4346
import org.slf4j.Logger;
4447
import org.slf4j.LoggerFactory;
4548

@@ -79,6 +82,8 @@ public class LeaderElectionClient implements AutoCloseable {
7982
private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/";
8083
ReElectListener _reElectListener = new ReElectListener();
8184
ConnectStateListener _connectStateListener = new ConnectStateListener();
85+
private final ConcurrentHashMap<String, Set<LeaderElectionListenerInterfaceAdapter>> _leaderChangeListener =
86+
new ConcurrentHashMap<>();
8287

8388
/**
8489
* Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient
@@ -192,6 +197,7 @@ private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
192197
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false);
193198
LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
194199
leaderInfo.setLeaderName(_participant);
200+
leaderInfo.setAcquiredTime();
195201

196202
try {
197203
createPathIfNotExists(leaderPath);
@@ -349,6 +355,7 @@ public List<String> getParticipants(String leaderPath) {
349355
*/
350356
public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) {
351357
LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
358+
_leaderChangeListener.computeIfAbsent(leaderPath, k -> ConcurrentHashMap.newKeySet()).add(adapter);
352359
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
353360
adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there
354361
_metaClient.subscribeStateChanges(adapter);
@@ -364,6 +371,7 @@ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListen
364371
_metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter
365372
);
366373
_metaClient.unsubscribeConnectStateChanges(adapter);
374+
_leaderChangeListener.get(leaderPath).remove(adapter);
367375
}
368376

369377
@Override
@@ -421,6 +429,23 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState
421429
LOG.info("Participant {} already in leader group {}.", _participant, leaderPath);
422430
}
423431
}
432+
// resubscribe the re-elect listener
433+
for (String leaderPath : _leaderGroups) {
434+
LOG.info("Resubscribe re-elect listener for leaderPath {}.", leaderPath);
435+
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false);
436+
}
437+
438+
// resubscribe to leader entry change since we are reconnected
439+
for (Map.Entry<String, Set<LeaderElectionListenerInterfaceAdapter>> entry: _leaderChangeListener.entrySet()) {
440+
LOG.info("Resubscribe leader change listener for leaderPath {}.", entry.getKey());
441+
String leaderPath = entry.getKey();
442+
Set<LeaderElectionListenerInterfaceAdapter> listeners = entry.getValue();
443+
for (LeaderElectionListenerInterfaceAdapter listener : listeners) {
444+
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
445+
listener, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there
446+
_metaClient.subscribeStateChanges(listener);
447+
}
448+
}
424449
// touch leader node to renew session ID
425450
touchLeaderNode();
426451
}
@@ -436,11 +461,15 @@ private void touchLeaderNode() {
436461
for (String leaderPath : _leaderGroups) {
437462
String key = leaderPath;
438463
ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = _metaClient.getDataAndStat(key);
464+
LOG.info("touch leader node: current leader: {}, current participant: {}",
465+
tup.left.getLeaderName(), _participant);
439466
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
440467
int expectedVersion = tup.right.getVersion();
468+
LeaderInfo newInfo = new LeaderInfo(tup.left, tup.left.getId());
469+
newInfo.setAcquiredTime();
441470
try {
442471
LOG.info("Try touch leader node for path {}", _leaderGroups);
443-
_metaClient.set(key, tup.left, expectedVersion);
472+
_metaClient.set(key, newInfo, expectedVersion);
444473
} catch (MetaClientNoNodeException ex) {
445474
LOG.info("leaderPath {} gone when retouch leader node.", key);
446475
} catch (MetaClientBadVersionException e) {

meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,23 @@ public enum LeaderAttribute {
5353
PARTICIPANTS
5454
}
5555

56-
@JsonIgnore(true)
57-
public String getLeaderName() {
56+
@JsonIgnore(true)
57+
public String getLeaderName() {
5858
return getSimpleField("LEADER_NAME");
5959
}
6060

6161
@JsonIgnore(true)
6262
public void setLeaderName(String id) {
63-
setSimpleField("LEADER_NAME", id);
63+
setSimpleField("LEADER_NAME", id);
6464
}
6565

66+
@JsonIgnore(true)
67+
public void setAcquiredTime() {
68+
setSimpleField("ACQUIRED_TIME", String.valueOf(System.currentTimeMillis()));
69+
}
6670

71+
@JsonIgnore(true)
72+
public String getAcquiredTime() {
73+
return getSimpleField("ACQUIRED_TIME");
74+
}
6775
}

meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public void testIsLeaderBeforeJoiningParticipantPool() throws Exception {
5555
clt1.close();
5656
}
5757

58+
5859
@Test (dependsOnMethods = "testIsLeaderBeforeJoiningParticipantPool")
5960
public void testAcquireLeadership() throws Exception {
6061
System.out.println("START TestLeaderElection.testAcquireLeadership");
@@ -143,7 +144,7 @@ public void testElectionPoolMembership() throws Exception {
143144
clt2.close();
144145
System.out.println("END TestLeaderElection.testElectionPoolMembership");
145146
}
146-
147+
/*
147148
@Test(dependsOnMethods = "testElectionPoolMembership")
148149
public void testLeadershipListener() throws Exception {
149150
System.out.println("START TestLeaderElection.testLeadershipListener");
@@ -200,9 +201,9 @@ public void onLeadershipChange(String leaderPath, ChangeType type, String curLea
200201
clt2.close();
201202
clt3.close();
202203
System.out.println("END TestLeaderElection.testLeadershipListener");
203-
}
204+
}*/
204205

205-
@Test(dependsOnMethods = "testLeadershipListener")
206+
@Test(dependsOnMethods = "testElectionPoolMembership")
206207
public void testRelinquishLeadership() throws Exception {
207208
System.out.println("START TestLeaderElection.testRelinquishLeadership");
208209
String leaderPath = LEADER_PATH + "/testRelinquishLeadership";
@@ -382,6 +383,74 @@ public void testClientClosedAndReconnectAfterExpire() throws Exception {
382383
System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire");
383384
}
384385

386+
@Test (dependsOnMethods = "testClientClosedAndReconnectAfterExpire")
387+
public void testClientLeadershipChangeListenersAfterExpire() throws Exception {
388+
System.out.println("START TestLeaderElection.testClientLeadershipChangeListenersAfterEspire");
389+
String leaderPath = LEADER_PATH + "/testClientLeadershipChangeListenersAfterEspire";
390+
LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
391+
participantInfo.setSimpleField("Key1", "value1");
392+
LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
393+
participantInfo2.setSimpleField("Key2", "value2");
394+
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
395+
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
396+
397+
clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
398+
clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
399+
400+
final int[] numNewLeaderEvent = {0};
401+
final int[] numLeaderGoneEvent = {0};
402+
CountDownLatch countDownLatchNewLeader = new CountDownLatch(1);
403+
404+
405+
LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() {
406+
407+
@Override
408+
public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) {
409+
if (type == ChangeType.LEADER_LOST) {
410+
//countDownLatchLeaderGone.countDown();
411+
Assert.assertEquals(curLeader.length(), 0);
412+
numLeaderGoneEvent[0]++;
413+
System.out.println("LEADER_LOST " + numLeaderGoneEvent[0]);
414+
} else if (type == ChangeType.LEADER_ACQUIRED) {
415+
countDownLatchNewLeader.countDown();
416+
numNewLeaderEvent[0]++;
417+
System.out.println("LEADER_ACQUIRED, cur leader: " + curLeader);
418+
Assert.assertTrue(curLeader.length() != 0);
419+
} else {
420+
Assert.fail();
421+
}
422+
}
423+
};
424+
clt1.subscribeLeadershipChanges(leaderPath, listener);
425+
// session expire and reconnect
426+
expireSession((ZkMetaClient) clt1.getMetaClient());
427+
428+
// when session recreated, participant info node should maintain
429+
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
430+
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
431+
432+
// clt1 closed and reconnected
433+
simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient());
434+
435+
// verify listener get called after session expire and reconnect
436+
clt2.exitLeaderElectionParticipantPool(leaderPath);
437+
438+
// now clt1 should be leader
439+
// verify we got a new leader event after node 2 left
440+
Assert.assertTrue(MetaClientTestUtil.verify(()-> {
441+
return (numNewLeaderEvent[0] == 1);
442+
}, MetaClientTestUtil.WAIT_DURATION));
443+
countDownLatchNewLeader.await();
444+
445+
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
446+
return (clt1.getLeader(leaderPath) != null);
447+
}, MetaClientTestUtil.WAIT_DURATION));
448+
449+
((ZkMetaClient<?>) clt1.getMetaClient()).close();
450+
System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire");
451+
}
452+
453+
385454
private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2)
386455
throws Exception {
387456
clt1.joinLeaderElectionParticipantPool(leaderPath);

0 commit comments

Comments
 (0)