@@ -195,7 +195,8 @@ private void createPathIfNotExists(String path) {
195195 }
196196
197197 private void subscribeAndTryCreateLeaderEntry (String leaderPath ) {
198- _metaClient .subscribeDataChange (leaderPath + LEADER_ENTRY_KEY , _reElectListener , false );
198+ _leaderGroups .add (leaderPath + LEADER_ENTRY_KEY );
199+ registerAllListeners ();
199200 LeaderInfo leaderInfo = new LeaderInfo (LEADER_ENTRY_KEY );
200201 leaderInfo .setLeaderName (_participant );
201202 leaderInfo .setAcquiredTime ();
@@ -215,8 +216,6 @@ private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
215216 } catch (MetaClientNodeExistsException ex ) {
216217 LOG .info ("Already a leader in leader group {}." , leaderPath );
217218 }
218-
219- _leaderGroups .add (leaderPath + LEADER_ENTRY_KEY );
220219 }
221220
222221 /**
@@ -356,7 +355,7 @@ public List<String> getParticipants(String leaderPath) {
356355 */
357356 public boolean subscribeLeadershipChanges (String leaderPath , LeaderElectionListenerInterface listener ) {
358357 LeaderElectionListenerInterfaceAdapter adapter = new LeaderElectionListenerInterfaceAdapter (leaderPath , listener );
359- _leaderChangeListeners .computeIfAbsent (leaderPath , k -> ConcurrentHashMap .newKeySet ()).add (adapter );
358+ _leaderChangeListeners .computeIfAbsent (leaderPath + LEADER_ENTRY_KEY , k -> ConcurrentHashMap .newKeySet ()).add (adapter );
360359 _metaClient .subscribeDataChange (leaderPath + LEADER_ENTRY_KEY ,
361360 adapter , false /*skipWatchingNonExistNode*/ ); // we need to subscribe event when path is not there
362361 _metaClient .subscribeStateChanges (adapter );
@@ -372,7 +371,7 @@ public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListen
372371 _metaClient .unsubscribeDataChange (leaderPath + LEADER_ENTRY_KEY , adapter
373372 );
374373 _metaClient .unsubscribeConnectStateChanges (adapter );
375- _leaderChangeListeners .get (leaderPath ).remove (adapter );
374+ _leaderChangeListeners .get (leaderPath + LEADER_ENTRY_KEY ).remove (adapter );
376375 }
377376
378377 @ Override
@@ -430,23 +429,8 @@ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState
430429 LOG .info ("Participant {} already in leader group {}." , _participant , leaderPath );
431430 }
432431 }
433- // resubscribe the re-elect listener
434- for (String leaderPath : _leaderGroups ) {
435- LOG .info ("Resubscribe re-elect listener for leaderPath {}." , leaderPath );
436- _metaClient .subscribeDataChange (leaderPath + LEADER_ENTRY_KEY , _reElectListener , false );
437- }
438432
439- // resubscribe to leader entry change since we are reconnected
440- for (Map .Entry <String , Set <LeaderElectionListenerInterfaceAdapter >> entry : _leaderChangeListeners .entrySet ()) {
441- LOG .info ("Resubscribe leader change listener for leaderPath {}." , entry .getKey ());
442- String leaderPath = entry .getKey ();
443- Set <LeaderElectionListenerInterfaceAdapter > listeners = entry .getValue ();
444- for (LeaderElectionListenerInterfaceAdapter listener : listeners ) {
445- _metaClient .subscribeDataChange (leaderPath + LEADER_ENTRY_KEY ,
446- listener , false /*skipWatchingNonExistNode*/ ); // we need to subscribe event when path is not there
447- _metaClient .subscribeStateChanges (listener );
448- }
449- }
433+ registerAllListeners ();
450434 // touch leader node to renew session ID
451435 touchLeaderNode ();
452436 }
@@ -482,6 +466,26 @@ private void touchLeaderNode() {
482466 }
483467 }
484468
469+ private void registerAllListeners (){
470+ // resubscribe the re-elect listener
471+ for (String leaderPath : _leaderGroups ) {
472+ LOG .info ("Subscribe re-elect listener for leaderPath {}." , leaderPath );
473+ _metaClient .subscribeDataChange (leaderPath , _reElectListener , false );
474+ }
475+
476+ // resubscribe to leader entry change since we are reconnected
477+ for (Map .Entry <String , Set <LeaderElectionListenerInterfaceAdapter >> entry : _leaderChangeListeners .entrySet ()) {
478+ String leaderPath = entry .getKey ();
479+ LOG .info ("Subscribe leader change listener for leaderPath {}." ,leaderPath );
480+ Set <LeaderElectionListenerInterfaceAdapter > listeners = entry .getValue ();
481+ for (LeaderElectionListenerInterfaceAdapter listener : listeners ) {
482+ _metaClient .subscribeDataChange (leaderPath ,
483+ listener , false /*skipWatchingNonExistNode*/ ); // we need to subscribe event when path is not there
484+ _metaClient .subscribeStateChanges (listener );
485+ }
486+ }
487+ }
488+
485489 public MetaClientInterface getMetaClient () {
486490 return _metaClient ;
487491 }
0 commit comments