Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
Expand All @@ -40,6 +42,7 @@
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,9 +79,12 @@ public class LeaderElectionClient implements AutoCloseable {

private final static String LEADER_ENTRY_KEY = "/LEADER";
private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS";

private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/";
ReElectListener _reElectListener = new ReElectListener();
ConnectStateListener _connectStateListener = new ConnectStateListener();
private final ConcurrentHashMap<String, Set<LeaderElectionListenerInterfaceAdapter>> _leaderChangeListeners =
new ConcurrentHashMap<>();

/**
* Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient
Expand Down Expand Up @@ -192,6 +198,7 @@ private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false);
LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
leaderInfo.setLeaderName(_participant);
leaderInfo.setAcquiredTime();

try {
createPathIfNotExists(leaderPath);
Expand Down Expand Up @@ -349,6 +356,7 @@ public List<String> getParticipants(String leaderPath) {
*/
public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) {
LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter(leaderPath, listener);
_leaderChangeListeners.computeIfAbsent(leaderPath, k -> ConcurrentHashMap.newKeySet()).add(adapter);
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
adapter, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there
_metaClient.subscribeStateChanges(adapter);
Expand All @@ -364,6 +372,7 @@ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListen
_metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, adapter
);
_metaClient.unsubscribeConnectStateChanges(adapter);
_leaderChangeListeners.get(leaderPath).remove(adapter);
}

@Override
Expand Down Expand Up @@ -421,6 +430,23 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState
LOG.info("Participant {} already in leader group {}.", _participant, leaderPath);
}
}
// resubscribe the re-elect listener
for (String leaderPath : _leaderGroups) {
LOG.info("Resubscribe re-elect listener for leaderPath {}.", leaderPath);
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false);
}

// resubscribe to leader entry change since we are reconnected
for (Map.Entry<String, Set<LeaderElectionListenerInterfaceAdapter>> entry: _leaderChangeListeners.entrySet()) {
LOG.info("Resubscribe leader change listener for leaderPath {}.", entry.getKey());
String leaderPath = entry.getKey();
Set<LeaderElectionListenerInterfaceAdapter> listeners = entry.getValue();
for (LeaderElectionListenerInterfaceAdapter listener : listeners) {
_metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
listener, false /*skipWatchingNonExistNode*/); // we need to subscribe event when path is not there
_metaClient.subscribeStateChanges(listener);
}
}
// touch leader node to renew session ID
touchLeaderNode();
}
Expand All @@ -436,11 +462,15 @@ private void touchLeaderNode() {
for (String leaderPath : _leaderGroups) {
String key = leaderPath;
ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = _metaClient.getDataAndStat(key);
LOG.info("touch leader node: current leader: {}, current participant: {}",
tup.left.getLeaderName(), _participant);
if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
int expectedVersion = tup.right.getVersion();
LeaderInfo newInfo = new LeaderInfo(tup.left, tup.left.getId());
newInfo.setAcquiredTime();
try {
LOG.info("Try touch leader node for path {}", _leaderGroups);
_metaClient.set(key, tup.left, expectedVersion);
_metaClient.set(key, newInfo, expectedVersion);
} catch (MetaClientNoNodeException ex) {
LOG.info("leaderPath {} gone when retouch leader node.", key);
} catch (MetaClientBadVersionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,23 @@ public enum LeaderAttribute {
PARTICIPANTS
}

@JsonIgnore(true)
public String getLeaderName() {
@JsonIgnore(true)
public String getLeaderName() {
return getSimpleField("LEADER_NAME");
}

@JsonIgnore(true)
public void setLeaderName(String id) {
setSimpleField("LEADER_NAME", id);
setSimpleField("LEADER_NAME", id);
}

@JsonIgnore(true)
public void setAcquiredTime() {
setSimpleField("ACQUIRED_TIME", String.valueOf(System.currentTimeMillis()));
}

@JsonIgnore(true)
public String getAcquiredTime() {
return getSimpleField("ACQUIRED_TIME");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void testIsLeaderBeforeJoiningParticipantPool() throws Exception {
clt1.close();
}


@Test (dependsOnMethods = "testIsLeaderBeforeJoiningParticipantPool")
public void testAcquireLeadership() throws Exception {
System.out.println("START TestLeaderElection.testAcquireLeadership");
Expand Down Expand Up @@ -382,6 +383,74 @@ public void testClientClosedAndReconnectAfterExpire() throws Exception {
System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire");
}

@Test (dependsOnMethods = "testClientClosedAndReconnectAfterExpire")
public void testClientLeadershipChangeListenersAfterExpire() throws Exception {
System.out.println("START TestLeaderElection.testClientLeadershipChangeListenersAfterEspire");
String leaderPath = LEADER_PATH + "/testClientLeadershipChangeListenersAfterEspire";
LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
participantInfo.setSimpleField("Key1", "value1");
LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
participantInfo2.setSimpleField("Key2", "value2");
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);

clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);

final int[] numNewLeaderEvent = {0};
final int[] numLeaderGoneEvent = {0};
CountDownLatch countDownLatchNewLeader = new CountDownLatch(1);


LeaderElectionListenerInterface listener = new LeaderElectionListenerInterface() {

@Override
public void onLeadershipChange(String leaderPath, ChangeType type, String curLeader) {
if (type == ChangeType.LEADER_LOST) {
//countDownLatchLeaderGone.countDown();
Assert.assertEquals(curLeader.length(), 0);
numLeaderGoneEvent[0]++;
System.out.println("LEADER_LOST " + numLeaderGoneEvent[0]);
} else if (type == ChangeType.LEADER_ACQUIRED) {
countDownLatchNewLeader.countDown();
numNewLeaderEvent[0]++;
System.out.println("LEADER_ACQUIRED, cur leader: " + curLeader);
Assert.assertTrue(curLeader.length() != 0);
} else {
Assert.fail();
}
}
};
clt1.subscribeLeadershipChanges(leaderPath, listener);
// session expire and reconnect
expireSession((ZkMetaClient) clt1.getMetaClient());

// when session recreated, participant info node should maintain
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
Assert.assertEquals(clt2.getParticipantInfo(leaderPath, PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");

// clt1 closed and reconnected
simulateZkStateClosedAndReconnect((ZkMetaClient) clt1.getMetaClient());

// verify listener get called after session expire and reconnect
clt2.exitLeaderElectionParticipantPool(leaderPath);

// now clt1 should be leader
// verify we got a new leader event after node 2 left
Assert.assertTrue(MetaClientTestUtil.verify(()-> {
return (numNewLeaderEvent[0] == 1);
}, MetaClientTestUtil.WAIT_DURATION));
countDownLatchNewLeader.await();

Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(leaderPath) != null);
}, MetaClientTestUtil.WAIT_DURATION));

((ZkMetaClient<?>) clt1.getMetaClient()).close();
System.out.println("END TestLeaderElection.testClientClosedAndReconnectAfterExpire");
}


private void joinPoolTestHelper(String leaderPath, LeaderElectionClient clt1, LeaderElectionClient clt2)
throws Exception {
clt1.joinLeaderElectionParticipantPool(leaderPath);
Expand Down