diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java index fc25289a86..9f8513e1e0 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.helix.metaclient.api.ConnectStateChangeListener; import org.apache.helix.metaclient.api.DataChangeListener; @@ -40,6 +42,7 @@ import org.apache.helix.metaclient.factories.MetaClientConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; +import org.apache.helix.zookeeper.zkclient.ZkClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,9 +79,12 @@ public class LeaderElectionClient implements AutoCloseable { private final static String LEADER_ENTRY_KEY = "/LEADER"; private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS"; + private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/"; ReElectListener _reElectListener = new ReElectListener(); ConnectStateListener _connectStateListener = new ConnectStateListener(); + private final ConcurrentHashMap> _leaderChangeListeners = + new ConcurrentHashMap<>(); /** * Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient @@ -189,9 +195,11 @@ private void createPathIfNotExists(String path) { } private void subscribeAndTryCreateLeaderEntry(String leaderPath) { - _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false); + _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY); + registerAllListeners(); LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY); leaderInfo.setLeaderName(_participant); + leaderInfo.setAcquiredTime(); try { createPathIfNotExists(leaderPath); @@ -208,8 +216,6 @@ private void subscribeAndTryCreateLeaderEntry(String leaderPath) { } catch (MetaClientNodeExistsException ex) { LOG.info("Already a leader in leader group {}.", leaderPath); } - - _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY); } /** @@ -349,6 +355,7 @@ public List getParticipants(String leaderPath) { */ public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener); + _leaderChangeListeners.computeIfAbsent(leaderPath + LEADER_ENTRY_KEY, k -> ConcurrentHashMap.newKeySet()).add(adapter); _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there _metaClient.subscribeStateChanges(adapter); @@ -364,6 +371,7 @@ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListen _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter ); _metaClient.unsubscribeConnectStateChanges(adapter); + _leaderChangeListeners.get(leaderPath + LEADER_ENTRY_KEY).remove(adapter); } @Override @@ -421,6 +429,8 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState LOG.info("Participant {} already in leader group {}.", _participant, leaderPath); } } + + registerAllListeners(); // touch leader node to renew session ID touchLeaderNode(); } @@ -436,11 +446,15 @@ private void touchLeaderNode() { for (String leaderPath : _leaderGroups) { String key = leaderPath; ImmutablePair tup = _metaClient.getDataAndStat(key); + LOG.info("touch leader node: current leader: {}, current participant: {}", + tup.left.getLeaderName(), _participant); if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) { int expectedVersion = tup.right.getVersion(); + LeaderInfo newInfo = new LeaderInfo(tup.left, tup.left.getId()); + newInfo.setAcquiredTime(); try { LOG.info("Try touch leader node for path {}", _leaderGroups); - _metaClient.set(key, tup.left, expectedVersion); + _metaClient.set(key, newInfo, expectedVersion); } catch (MetaClientNoNodeException ex) { LOG.info("leaderPath {} gone when retouch leader node.", key); } catch (MetaClientBadVersionException e) { @@ -452,6 +466,26 @@ private void touchLeaderNode() { } } + private void registerAllListeners(){ + // resubscribe the re-elect listener + for (String leaderPath : _leaderGroups) { + LOG.info("Subscribe re-elect listener for leaderPath {}.", leaderPath); + _metaClient.subscribeDataChange(leaderPath, _reElectListener, false); + } + + // resubscribe to leader entry change since we are reconnected + for (Map.Entry> entry: _leaderChangeListeners.entrySet()) { + String leaderPath = entry.getKey(); + LOG.info("Subscribe leader change listener for leaderPath {}.",leaderPath); + Set listeners = entry.getValue(); + for (LeaderElectionListenerInterfaceAdapter listener : listeners) { + _metaClient.subscribeDataChange(leaderPath, + listener, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there + _metaClient.subscribeStateChanges(listener); + } + } + } + public MetaClientInterface getMetaClient() { return _metaClient; } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java index ab0562502d..2e1a4b2a36 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java @@ -53,15 +53,23 @@ public enum LeaderAttribute { PARTICIPANTS } -@JsonIgnore(true) -public String getLeaderName() { + @JsonIgnore(true) + public String getLeaderName() { return getSimpleField("LEADER_NAME"); } @JsonIgnore(true) public void setLeaderName(String id) { - setSimpleField("LEADER_NAME", id); + setSimpleField("LEADER_NAME", id); } + @JsonIgnore(true) + public void setAcquiredTime() { + setSimpleField("ACQUIRED_TIME", String.valueOf(System.currentTimeMillis())); + } + @JsonIgnore(true) + public String getAcquiredTime() { + return getSimpleField("ACQUIRED_TIME"); + } } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java index c7aa06eff1..8bb4841e72 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java @@ -55,6 +55,7 @@ public void testIsLeaderBeforeJoiningParticipantPool() throws Exception { clt1.close(); } + @Test (dependsOnMethods = "testIsLeaderBeforeJoiningParticipantPool") public void testAcquireLeadership() throws Exception { System.out.println("START TestLeaderElection.testAcquireLeadership"); @@ -382,6 +383,94 @@ public void testClientClosedAndReconnectAfterExpire() throws Exception { System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire"); } + @Test (dependsOnMethods = "testClientClosedAndReconnectAfterExpire") + public void testClientLeadershipChangeListenersAfterExpire() throws Exception { + System.out.println("START TestLeaderElection.testClientLeadershipChangeListenersAfterEspire"); + String leaderPath = LEADER_PATH + "/testClientLeadershipChangeListenersAfterEspire"; + LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1); + participantInfo.setSimpleField("Key1", "value1"); + LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2); + participantInfo2.setSimpleField("Key2", "value2"); + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); + + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); + clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); + + final int[] numNewLeaderEvent = {0}; + final int[] numLeaderGoneEvent = {0}; + CountDownLatch countDownLatchNewLeader = new CountDownLatch(1); + + + LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() { + + @Override + public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) { + if (type == ChangeType.LEADER_LOST) { + //countDownLatchLeaderGone.countDown(); + Assert.assertEquals(curLeader.length(), 0); + numLeaderGoneEvent[0]++; + System.out.println("LEADER_LOST " + numLeaderGoneEvent[0]); + } else if (type == ChangeType.LEADER_ACQUIRED) { + countDownLatchNewLeader.countDown(); + numNewLeaderEvent[0]++; + System.out.println("LEADER_ACQUIRED, cur leader: " + curLeader); + Assert.assertTrue(curLeader.length() != 0); + } else { + Assert.fail(); + } + } + }; + clt1.subscribeLeadershipChanges(leaderPath, listener); + // session expire and reconnect + expireSession((ZkMetaClient) clt1.getMetaClient()); + + // when session recreated, participant info node should maintain + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1"); + Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2"); + + // clt1 closed and reconnected + simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient()); + + // verify listener get called after session expire and reconnect + clt2.exitLeaderElectionParticipantPool(leaderPath); + + // now clt1 should be leader + // verify we got a new leader event after node 2 left + Assert.assertTrue(MetaClientTestUtil.verify(()-> { + return (numNewLeaderEvent[0] == 1); + }, MetaClientTestUtil.WAIT_DURATION)); + countDownLatchNewLeader.await(); + + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + + // have clt2 join and clt1 leave and join again, and verify listener still works + clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2); + clt1.exitLeaderElectionParticipantPool(leaderPath); + + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt2.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt2.getLeader(leaderPath).equals(PARTICIPANT_NAME2)); + }, MetaClientTestUtil.WAIT_DURATION)); + + // now clt1 join again, and verify listener still works + clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); + clt2.exitLeaderElectionParticipantPool(leaderPath); + // verify clt1 is leader + Assert.assertTrue(MetaClientTestUtil.verify(() -> { + return (clt1.getLeader(leaderPath) != null); + }, MetaClientTestUtil.WAIT_DURATION)); + + + ((ZkMetaClient) clt1.getMetaClient()).close(); + System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire"); + } + + private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2) throws Exception { clt1.joinLeaderElectionParticipantPool(leaderPath);