Skip to content

Commit 11a80b0

Browse files
committed
Atomic asyncronous stop/start swarm
1 parent 650b0f0 commit 11a80b0

File tree

6 files changed

+74
-27
lines changed

6 files changed

+74
-27
lines changed

src/Interpreters/ClusterDiscovery.cpp

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
108108
cv.notify_one();
109109
}
110110

111+
void wakeup()
112+
{
113+
std::unique_lock<std::mutex> lk(mu);
114+
any_need_update = true;
115+
cv.notify_one();
116+
}
117+
111118
private:
112119
std::condition_variable cv;
113120
std::mutex mu;
@@ -527,20 +534,14 @@ void ClusterDiscovery::initialUpdate()
527534

528535
void ClusterDiscovery::registerAll()
529536
{
530-
for (auto & [_, info] : clusters_info)
531-
{
532-
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
533-
registerInZk(zk, info);
534-
}
537+
register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
538+
clusters_to_update->wakeup();
535539
}
536540

537541
void ClusterDiscovery::unregisterAll()
538542
{
539-
for (auto & [_, info] : clusters_info)
540-
{
541-
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
542-
unregisterFromZk(zk, info);
543-
}
543+
register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
544+
clusters_to_update->wakeup();
544545
}
545546

546547
void ClusterDiscovery::findDynamicClusters(
@@ -766,6 +767,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
766767
{
767768
up_to_date_callback();
768769
}
770+
771+
RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE);
772+
773+
if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
774+
{
775+
LOG_DEBUG(log, "Register in all dynamic clusters");
776+
for (auto & [_, info] : clusters_info)
777+
{
778+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
779+
registerInZk(zk, info);
780+
}
781+
}
782+
else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
783+
{
784+
LOG_DEBUG(log, "Unregister in all dynamic clusters");
785+
for (auto & [_, info] : clusters_info)
786+
{
787+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
788+
unregisterFromZk(zk, info);
789+
}
790+
}
769791
}
770792
LOG_DEBUG(log, "Worker thread stopped");
771793
return finished;

src/Interpreters/ClusterDiscovery.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,15 @@ class ClusterDiscovery
211211
std::shared_ptr<std::vector<std::shared_ptr<MulticlusterDiscovery>>> multicluster_discovery_paths;
212212

213213
MultiVersion<Macros>::Version macros;
214+
215+
enum RegisterChangeFlag
216+
{
217+
RCF_NONE,
218+
RCF_REGISTER_ALL,
219+
RCF_UNREGISTER_ALL,
220+
};
221+
222+
std::atomic<RegisterChangeFlag> register_change_flag = RegisterChangeFlag::RCF_NONE;
214223
};
215224

216225
}

src/Interpreters/Context.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ struct ContextSharedPart : boost::noncopyable
737737
void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
738738
{
739739
swarm_mode_enabled = false;
740+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
740741
bool is_shutdown_called = shutdown_called.exchange(true);
741742
if (is_shutdown_called)
742743
return;
@@ -4502,15 +4503,15 @@ std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name
45024503
return res;
45034504
}
45044505

4505-
void Context::unregisterInDynamicClusters()
4506+
void Context::unregisterInAutodiscoveryClusters()
45064507
{
45074508
std::lock_guard lock(shared->clusters_mutex);
45084509
if (!shared->cluster_discovery)
45094510
return;
45104511
shared->cluster_discovery->unregisterAll();
45114512
}
45124513

4513-
void Context::registerInDynamicClusters()
4514+
void Context::registerInAutodiscoveryClusters()
45144515
{
45154516
std::lock_guard lock(shared->clusters_mutex);
45164517
if (!shared->cluster_discovery)
@@ -5372,16 +5373,24 @@ void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
53725373
shared->shutdown();
53735374
}
53745375

5375-
void Context::stopSwarmMode()
5376+
bool Context::stopSwarmMode()
53765377
{
5377-
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
5378-
shared->swarm_mode_enabled = false;
5378+
bool expected_is_enabled = true;
5379+
bool is_stopped_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, false);
5380+
if (is_stopped_now)
5381+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
5382+
// return true if stop successful
5383+
return is_stopped_now;
53795384
}
53805385

5381-
void Context::startSwarmMode()
5386+
bool Context::startSwarmMode()
53825387
{
5383-
shared->swarm_mode_enabled = true;
5384-
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1);
5388+
bool expected_is_enabled = false;
5389+
bool is_started_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, true);
5390+
if (is_started_now)
5391+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1);
5392+
// return true if start successful
5393+
return is_started_now;
53855394
}
53865395

53875396
bool Context::isSwarmModeEnabled() const

src/Interpreters/Context.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,8 +1225,8 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
12251225
size_t getClustersVersion() const;
12261226

12271227
void startClusterDiscovery();
1228-
void registerInDynamicClusters();
1229-
void unregisterInDynamicClusters();
1228+
void registerInAutodiscoveryClusters();
1229+
void unregisterInAutodiscoveryClusters();
12301230

12311231
/// Sets custom cluster, but doesn't update configuration
12321232
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
@@ -1337,9 +1337,13 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
13371337

13381338
void shutdown();
13391339

1340-
/// Stop some works to allow graceful shutdown later
1341-
void stopSwarmMode();
1342-
void startSwarmMode();
1340+
/// Stop some works to allow graceful shutdown later.
1341+
/// Returns true if stop successful.
1342+
bool stopSwarmMode();
1343+
/// Resume some works if we change our mind.
1344+
/// Returns true if start successful.
1345+
bool startSwarmMode();
1346+
/// Return current swarm mode state.
13431347
bool isSwarmModeEnabled() const;
13441348

13451349
bool isInternalQuery() const { return is_internal_query; }

src/Interpreters/InterpreterSystemQuery.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -696,15 +696,15 @@ BlockIO InterpreterSystemQuery::execute()
696696
case Type::STOP_SWARM_MODE:
697697
{
698698
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
699-
getContext()->stopSwarmMode();
700-
getContext()->unregisterInDynamicClusters();
699+
if (getContext()->stopSwarmMode())
700+
getContext()->unregisterInAutodiscoveryClusters();
701701
break;
702702
}
703703
case Type::START_SWARM_MODE:
704704
{
705705
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
706-
getContext()->registerInDynamicClusters();
707-
getContext()->startSwarmMode();
706+
if (getContext()->startSwarmMode())
707+
getContext()->registerInAutodiscoveryClusters();
708708
break;
709709
}
710710
case Type::STOP_FETCHES:

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
7474
}
7575
catch (const Exception & e)
7676
{
77+
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
78+
/// If initiator did not process any data packets before, this fact can be ignored.
79+
/// Unprocessed tasks will be executed on other nodes.
7780
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
7881
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
7982
{

0 commit comments

Comments
 (0)