@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
108108 cv.notify_one ();
109109 }
110110
111+ void wakeup ()
112+ {
113+ std::unique_lock<std::mutex> lk (mu);
114+ any_need_update = true ;
115+ cv.notify_one ();
116+ }
117+
111118private:
112119 std::condition_variable cv;
113120 std::mutex mu;
@@ -391,7 +398,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
391398 return true ;
392399 };
393400
394- if (!cluster_info.current_node_is_observer && !contains (node_uuids, current_node_name))
401+ if (!cluster_info.current_node_is_observer
402+ && context->isSwarmModeEnabled ()
403+ && !contains (node_uuids, current_node_name))
395404 {
396405 LOG_ERROR (log, " Can't find current node in cluster '{}', will register again" , cluster_info.name );
397406 registerInZk (zk, cluster_info);
@@ -455,12 +464,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
455464 return ;
456465 }
457466
467+ if (!context->isSwarmModeEnabled ())
468+ {
469+ LOG_DEBUG (log, " STOP SWARM MODE called, skip self-registering current node {} in cluster {}" , current_node_name, info.name );
470+ return ;
471+ }
472+
458473 LOG_DEBUG (log, " Registering current node {} in cluster {}" , current_node_name, info.name );
459474
460475 zk->createOrUpdate (node_path, info.current_node .serialize (), zkutil::CreateMode::Ephemeral);
461476 LOG_DEBUG (log, " Current node {} registered in cluster {}" , current_node_name, info.name );
462477}
463478
479+ void ClusterDiscovery::unregisterFromZk (zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
480+ {
481+ if (info.current_node_is_observer )
482+ return ;
483+
484+ String node_path = getShardsListPath (info.zk_root ) / current_node_name;
485+ LOG_DEBUG (log, " Removing current node {} from cluster {}" , current_node_name, info.name );
486+
487+ zk->remove (node_path);
488+ LOG_DEBUG (log, " Current node {} removed from cluster {}" , current_node_name, info.name );
489+ }
490+
464491void ClusterDiscovery::initialUpdate ()
465492{
466493 LOG_DEBUG (log, " Initializing" );
@@ -506,6 +533,18 @@ void ClusterDiscovery::initialUpdate()
506533 is_initialized = true ;
507534}
508535
536+ void ClusterDiscovery::registerAll ()
537+ {
538+ register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
539+ clusters_to_update->wakeup ();
540+ }
541+
542+ void ClusterDiscovery::unregisterAll ()
543+ {
544+ register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
545+ clusters_to_update->wakeup ();
546+ }
547+
509548void ClusterDiscovery::findDynamicClusters (
510549 std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
511550 std::unordered_set<size_t > * unchanged_roots)
@@ -729,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
729768 {
730769 up_to_date_callback ();
731770 }
771+
772+ RegisterChangeFlag flag = register_change_flag.exchange (RegisterChangeFlag::RCF_NONE);
773+
774+ if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
775+ {
776+ LOG_DEBUG (log, " Register in all dynamic clusters" );
777+ for (auto & [_, info] : clusters_info)
778+ {
779+ auto zk = context->getDefaultOrAuxiliaryZooKeeper (info.zk_name );
780+ registerInZk (zk, info);
781+ }
782+ }
783+ else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
784+ {
785+ LOG_DEBUG (log, " Unregister in all dynamic clusters" );
786+ for (auto & [_, info] : clusters_info)
787+ {
788+ auto zk = context->getDefaultOrAuxiliaryZooKeeper (info.zk_name );
789+ unregisterFromZk (zk, info);
790+ }
791+ }
732792 }
733793 LOG_DEBUG (log, " Worker thread stopped" );
734794 return finished;
0 commit comments