@@ -195,7 +195,8 @@ private void createPathIfNotExists(String path) {
195195 }
196196
197197 private void subscribeAndTryCreateLeaderEntry (String leaderPath ) {
198- _metaClient .subscribeDataChange (leaderPath + LEADER_ENTRY_KEY , _reElectListener , false );
198+ _leaderGroups .add (leaderPath + LEADER_ENTRY_KEY );
199+ registerAllListeners ();
199200 LeaderInfo leaderInfo = new LeaderInfo (LEADER_ENTRY_KEY );
200201 leaderInfo .setLeaderName (_participant );
201202 leaderInfo .setAcquiredTime ();
@@ -215,8 +216,6 @@ private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
215216 } catch (MetaClientNodeExistsException ex ) {
216217 LOG .info ("Already a leader in leader group {}." , leaderPath );
217218 }
218-
219- _leaderGroups .add (leaderPath + LEADER_ENTRY_KEY );
220219 }
221220
222221 /**
@@ -430,23 +429,8 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState
430429 LOG .info ("Participant {} already in leader group {}." , _participant , leaderPath );
431430 }
432431 }
433- // resubscribe the re-elect listener
434- for (String leaderPath : _leaderGroups ) {
435- LOG .info ("Resubscribe re-elect listener for leaderPath {}." , leaderPath );
436- _metaClient .subscribeDataChange (leaderPath + LEADER_ENTRY_KEY , _reElectListener , false );
437- }
438432
439- // resubscribe to leader entry change since we are reconnected
440- for (Map .Entry <String , Set <LeaderElectionListenerInterfaceAdapter >> entry : _leaderChangeListeners .entrySet ()) {
441- LOG .info ("Resubscribe leader change listener for leaderPath {}." , entry .getKey ());
442- String leaderPath = entry .getKey ();
443- Set <LeaderElectionListenerInterfaceAdapter > listeners = entry .getValue ();
444- for (LeaderElectionListenerInterfaceAdapter listener : listeners ) {
445- _metaClient .subscribeDataChange (leaderPath + LEADER_ENTRY_KEY ,
446- listener , false /*skipWatchingNonExistNode*/ ); // we need to subscribe event when path is not there
447- _metaClient .subscribeStateChanges (listener );
448- }
449- }
433+ registerAllListeners ();
450434 // touch leader node to renew session ID
451435 touchLeaderNode ();
452436 }
@@ -482,6 +466,26 @@ private void touchLeaderNode() {
482466 }
483467 }
484468
469+ private void registerAllListeners (){
470+ // resubscribe the re-elect listener
471+ for (String leaderPath : _leaderGroups ) {
472+ LOG .info ("Resubscribe re-elect listener for leaderPath {}." , leaderPath );
473+ _metaClient .subscribeDataChange (leaderPath , _reElectListener , false );
474+ }
475+
476+ // resubscribe to leader entry change since we are reconnected
477+ for (Map .Entry <String , Set <LeaderElectionListenerInterfaceAdapter >> entry : _leaderChangeListeners .entrySet ()) {
478+ LOG .info ("Resubscribe leader change listener for leaderPath {}." , entry .getKey ());
479+ String leaderPath = entry .getKey ();
480+ Set <LeaderElectionListenerInterfaceAdapter > listeners = entry .getValue ();
481+ for (LeaderElectionListenerInterfaceAdapter listener : listeners ) {
482+ _metaClient .subscribeDataChange (leaderPath + LEADER_ENTRY_KEY ,
483+ listener , false /*skipWatchingNonExistNode*/ ); // we need to subscribe event when path is not there
484+ _metaClient .subscribeStateChanges (listener );
485+ }
486+ }
487+ }
488+
485489 public MetaClientInterface getMetaClient () {
486490 return _metaClient ;
487491 }
0 commit comments