7171import org .apache .fluss .server .coordinator .event .CoordinatorEventManager ;
7272import org .apache .fluss .server .coordinator .event .CreatePartitionEvent ;
7373import org .apache .fluss .server .coordinator .event .CreateTableEvent ;
74+ import org .apache .fluss .server .coordinator .event .DeadCoordinatorEvent ;
7475import org .apache .fluss .server .coordinator .event .DeadTabletServerEvent ;
7576import org .apache .fluss .server .coordinator .event .DeleteReplicaResponseReceivedEvent ;
7677import org .apache .fluss .server .coordinator .event .DropPartitionEvent ;
7778import org .apache .fluss .server .coordinator .event .DropTableEvent ;
7879import org .apache .fluss .server .coordinator .event .EventProcessor ;
7980import org .apache .fluss .server .coordinator .event .FencedCoordinatorEvent ;
8081import org .apache .fluss .server .coordinator .event .ListRebalanceProgressEvent ;
82+ import org .apache .fluss .server .coordinator .event .NewCoordinatorEvent ;
8183import org .apache .fluss .server .coordinator .event .NewTabletServerEvent ;
8284import org .apache .fluss .server .coordinator .event .NotifyKvSnapshotOffsetEvent ;
8385import org .apache .fluss .server .coordinator .event .NotifyLakeTableOffsetEvent ;
8688import org .apache .fluss .server .coordinator .event .RemoveServerTagEvent ;
8789import org .apache .fluss .server .coordinator .event .SchemaChangeEvent ;
8890import org .apache .fluss .server .coordinator .event .TableRegistrationChangeEvent ;
91+ import org .apache .fluss .server .coordinator .event .watcher .CoordinatorChangeWatcher ;
8992import org .apache .fluss .server .coordinator .event .watcher .TableChangeWatcher ;
9093import org .apache .fluss .server .coordinator .event .watcher .TabletServerChangeWatcher ;
9194import org .apache .fluss .server .coordinator .lease .KvSnapshotLeaseManager ;
@@ -172,6 +175,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
172175 private final LakeTableTieringManager lakeTableTieringManager ;
173176 private final TableChangeWatcher tableChangeWatcher ;
174177 private final CoordinatorChannelManager coordinatorChannelManager ;
178+ private final CoordinatorChangeWatcher coordinatorChangeWatcher ;
175179 private final TabletServerChangeWatcher tabletServerChangeWatcher ;
176180 private final CoordinatorMetadataCache serverMetadataCache ;
177181 private final CoordinatorRequestBatch coordinatorRequestBatch ;
@@ -224,6 +228,8 @@ public CoordinatorEventProcessor(
224228 tableBucketStateMachine ,
225229 new RemoteStorageCleaner (conf , ioExecutor ),
226230 ioExecutor );
231+ this .coordinatorChangeWatcher =
232+ new CoordinatorChangeWatcher (zooKeeperClient , coordinatorEventManager );
227233 this .tableChangeWatcher = new TableChangeWatcher (zooKeeperClient , coordinatorEventManager );
228234 this .tabletServerChangeWatcher =
229235 new TabletServerChangeWatcher (zooKeeperClient , coordinatorEventManager );
@@ -263,6 +269,7 @@ public CoordinatorContext getCoordinatorContext() {
263269 public void startup () {
264270 coordinatorContext .setCoordinatorServerInfo (getCoordinatorServerInfo ());
265271 // start watchers first so that we won't miss node in zk;
272+ coordinatorChangeWatcher .start ();
266273 tabletServerChangeWatcher .start ();
267274 tableChangeWatcher .start ();
268275 LOG .info ("Initializing coordinator context." );
@@ -306,12 +313,9 @@ public void shutdown() {
306313 private ServerInfo getCoordinatorServerInfo () {
307314 try {
308315 return zooKeeperClient
309- .getCoordinatorAddress ()
316+ .getCoordinatorLeaderAddress ()
310317 .map (
311318 coordinatorAddress ->
312- // TODO we set id to 0 as that CoordinatorServer don't support
313- // HA, if we support HA, we need to set id to the config
314- // CoordinatorServer id to avoid node drift.
315319 new ServerInfo (
316320 0 ,
317321 null , // For coordinatorServer, no rack info
@@ -334,6 +338,11 @@ public int getCoordinatorEpoch() {
334338
335339 private void initCoordinatorContext () throws Exception {
336340 long start = System .currentTimeMillis ();
341+ // get all coordinator servers
342+ List <String > currentCoordinatorServers = zooKeeperClient .getCoordinatorServerList ();
343+ coordinatorContext .setLiveCoordinators (new HashSet <>(currentCoordinatorServers ));
344+ LOG .info ("Load coordinator servers success when initializing coordinator context." );
345+
337346 // get all tablet server's
338347 int [] currentServers = zooKeeperClient .getSortedTabletServerList ();
339348 List <ServerInfo > tabletServerInfos = new ArrayList <>();
@@ -548,6 +557,7 @@ private void onShutdown() {
548557 tableManager .shutdown ();
549558
550559 // then stop watchers
560+ coordinatorChangeWatcher .stop ();
551561 tableChangeWatcher .stop ();
552562 tabletServerChangeWatcher .stop ();
553563 }
@@ -572,6 +582,10 @@ public void process(CoordinatorEvent event) {
572582 (NotifyLeaderAndIsrResponseReceivedEvent ) event );
573583 } else if (event instanceof DeleteReplicaResponseReceivedEvent ) {
574584 processDeleteReplicaResponseReceived ((DeleteReplicaResponseReceivedEvent ) event );
585+ } else if (event instanceof NewCoordinatorEvent ) {
586+ processNewCoordinator ((NewCoordinatorEvent ) event );
587+ } else if (event instanceof DeadCoordinatorEvent ) {
588+ processDeadCoordinator ((DeadCoordinatorEvent ) event );
575589 } else if (event instanceof NewTabletServerEvent ) {
576590 processNewTabletServer ((NewTabletServerEvent ) event );
577591 } else if (event instanceof DeadTabletServerEvent ) {
@@ -984,6 +998,28 @@ private void onReplicaBecomeOffline(Set<TableBucketReplica> offlineReplicas) {
984998 replicaStateMachine .handleStateChanges (offlineReplicas , OfflineReplica );
985999 }
9861000
1001+ private void processNewCoordinator (NewCoordinatorEvent newCoordinatorEvent ) {
1002+ String coordinatorServerId = newCoordinatorEvent .getServerId ();
1003+ if (coordinatorContext .getLiveCoordinatorServers ().contains (coordinatorServerId )) {
1004+ return ;
1005+ }
1006+
1007+ // process new coordinator server
1008+ LOG .info ("New coordinator server callback for coordinator server {}" , coordinatorServerId );
1009+
1010+ coordinatorContext .addLiveCoordinator (coordinatorServerId );
1011+ }
1012+
1013+ private void processDeadCoordinator (DeadCoordinatorEvent deadCoordinatorEvent ) {
1014+ String coordinatorServerId = deadCoordinatorEvent .getServerId ();
1015+ if (!coordinatorContext .getLiveCoordinatorServers ().contains (coordinatorServerId )) {
1016+ return ;
1017+ }
1018+ // process dead coordinator server
1019+ LOG .info ("Coordinator server failure callback for {}." , coordinatorServerId );
1020+ coordinatorContext .removeLiveCoordinator (coordinatorServerId );
1021+ }
1022+
9871023 private void processNewTabletServer (NewTabletServerEvent newTabletServerEvent ) {
9881024 // NOTE: we won't need to detect bounced tablet servers like Kafka as we won't
9891025 // miss the event of tablet server un-register and register again since we can
0 commit comments