Skip to content

Commit e1373ee

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #1014 from Altinity/feature/system_preshutdown_v2
SYSTEM STOP SWARM MODE command for graceful shutdown swarm node merge attempt v2
1 parent 1f821d4 commit e1373ee

34 files changed

+311
-11
lines changed

docs/en/sql-reference/statements/system.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name]
206206
207207
Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`)
208208
209+
## PRESHUTDOWN {#preshutdown}
210+
211+
<CloudNotSupportedBadge/>
212+
213+
Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).
214+
209215
## KILL {#kill}
210216
211217
Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)

programs/server/Server.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2329,6 +2329,8 @@ try
23292329

23302330
}
23312331

2332+
global_context->startSwarmMode();
2333+
23322334
{
23332335
std::lock_guard lock(servers_lock);
23342336
/// We should start interserver communications before (and more important shutdown after) tables.
@@ -2777,6 +2779,8 @@ try
27772779

27782780
is_cancelled = true;
27792781

2782+
global_context->stopSwarmMode();
2783+
27802784
LOG_DEBUG(log, "Waiting for current connections to close.");
27812785

27822786
size_t current_connections = 0;

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ enum class AccessType : uint8_t
334334
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
335335
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
336336
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
337+
M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \
337338
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
338339
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
339340
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \

src/Common/CurrentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@
431431
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
432432
\
433433
M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \
434+
M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \
434435
\
435436
M(StatelessWorkerThreads, "Number of threads in the stateless worker thread pool.") \
436437
M(StatelessWorkerThreadsActive, "Number of threads in the stateless worker thread pool running a task.") \

src/Interpreters/ClusterDiscovery.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
108108
cv.notify_one();
109109
}
110110

111+
void wakeup()
112+
{
113+
std::unique_lock<std::mutex> lk(mu);
114+
any_need_update = true;
115+
cv.notify_one();
116+
}
117+
111118
private:
112119
std::condition_variable cv;
113120
std::mutex mu;
@@ -391,7 +398,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
391398
return true;
392399
};
393400

394-
if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name))
401+
if (!cluster_info.current_node_is_observer
402+
&& context->isSwarmModeEnabled()
403+
&& !contains(node_uuids, current_node_name))
395404
{
396405
LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name);
397406
registerInZk(zk, cluster_info);
@@ -455,12 +464,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
455464
return;
456465
}
457466

467+
if (!context->isSwarmModeEnabled())
468+
{
469+
LOG_DEBUG(log, "STOP SWARM MODE called, skip self-registering current node {} in cluster {}", current_node_name, info.name);
470+
return;
471+
}
472+
458473
LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);
459474

460475
zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
461476
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
462477
}
463478

479+
void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
480+
{
481+
if (info.current_node_is_observer)
482+
return;
483+
484+
String node_path = getShardsListPath(info.zk_root) / current_node_name;
485+
LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name);
486+
487+
zk->remove(node_path);
488+
LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name);
489+
}
490+
464491
void ClusterDiscovery::initialUpdate()
465492
{
466493
LOG_DEBUG(log, "Initializing");
@@ -506,6 +533,18 @@ void ClusterDiscovery::initialUpdate()
506533
is_initialized = true;
507534
}
508535

536+
void ClusterDiscovery::registerAll()
537+
{
538+
register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
539+
clusters_to_update->wakeup();
540+
}
541+
542+
void ClusterDiscovery::unregisterAll()
543+
{
544+
register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
545+
clusters_to_update->wakeup();
546+
}
547+
509548
void ClusterDiscovery::findDynamicClusters(
510549
std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
511550
std::unordered_set<size_t> * unchanged_roots)
@@ -729,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
729768
{
730769
up_to_date_callback();
731770
}
771+
772+
RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE);
773+
774+
if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
775+
{
776+
LOG_DEBUG(log, "Register in all dynamic clusters");
777+
for (auto & [_, info] : clusters_info)
778+
{
779+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
780+
registerInZk(zk, info);
781+
}
782+
}
783+
else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
784+
{
785+
LOG_DEBUG(log, "Unregister in all dynamic clusters");
786+
for (auto & [_, info] : clusters_info)
787+
{
788+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
789+
unregisterFromZk(zk, info);
790+
}
791+
}
732792
}
733793
LOG_DEBUG(log, "Worker thread stopped");
734794
return finished;

src/Interpreters/ClusterDiscovery.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ class ClusterDiscovery
3838

3939
~ClusterDiscovery();
4040

41+
void registerAll();
42+
void unregisterAll();
43+
4144
private:
4245
struct NodeInfo
4346
{
@@ -125,6 +128,7 @@ class ClusterDiscovery
125128
void initialUpdate();
126129

127130
void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
131+
void unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
128132

129133
Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
130134
const String & zk_root,
@@ -207,6 +211,15 @@ class ClusterDiscovery
207211
std::shared_ptr<std::vector<std::shared_ptr<MulticlusterDiscovery>>> multicluster_discovery_paths;
208212

209213
MultiVersion<Macros>::Version macros;
214+
215+
enum RegisterChangeFlag
216+
{
217+
RCF_NONE,
218+
RCF_REGISTER_ALL,
219+
RCF_UNREGISTER_ALL,
220+
};
221+
222+
std::atomic<RegisterChangeFlag> register_change_flag = RegisterChangeFlag::RCF_NONE;
210223
};
211224

212225
}

src/Interpreters/Context.cpp

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ namespace CurrentMetrics
224224
extern const Metric UncompressedCacheCells;
225225
extern const Metric IndexUncompressedCacheBytes;
226226
extern const Metric IndexUncompressedCacheCells;
227+
extern const Metric IsSwarmModeEnabled;
227228
}
228229

229230

@@ -620,6 +621,7 @@ struct ContextSharedPart : boost::noncopyable
620621
std::map<String, UInt16> server_ports;
621622

622623
std::atomic<bool> shutdown_called = false;
624+
std::atomic<bool> swarm_mode_enabled = true;
623625

624626
Stopwatch uptime_watch TSA_GUARDED_BY(mutex);
625627

@@ -788,6 +790,8 @@ struct ContextSharedPart : boost::noncopyable
788790
*/
789791
void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
790792
{
793+
swarm_mode_enabled = false;
794+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
791795
bool is_shutdown_called = shutdown_called.exchange(true);
792796
if (is_shutdown_called)
793797
return;
@@ -4825,7 +4829,6 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c
48254829
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
48264830
}
48274831

4828-
48294832
std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
48304833
{
48314834
std::shared_ptr<Cluster> res = nullptr;
@@ -4844,6 +4847,21 @@ std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name
48444847
return res;
48454848
}
48464849

4850+
void Context::unregisterInAutodiscoveryClusters()
4851+
{
4852+
std::lock_guard lock(shared->clusters_mutex);
4853+
if (!shared->cluster_discovery)
4854+
return;
4855+
shared->cluster_discovery->unregisterAll();
4856+
}
4857+
4858+
void Context::registerInAutodiscoveryClusters()
4859+
{
4860+
std::lock_guard lock(shared->clusters_mutex);
4861+
if (!shared->cluster_discovery)
4862+
return;
4863+
shared->cluster_discovery->registerAll();
4864+
}
48474865

48484866
void Context::reloadClusterConfig() const
48494867
{
@@ -5754,12 +5772,35 @@ void Context::stopServers(const ServerType & server_type) const
57545772
shared->stop_servers_callback(server_type);
57555773
}
57565774

5757-
57585775
void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
57595776
{
57605777
shared->shutdown();
57615778
}
57625779

5780+
bool Context::stopSwarmMode()
5781+
{
5782+
bool expected_is_enabled = true;
5783+
bool is_stopped_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, false);
5784+
if (is_stopped_now)
5785+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 0);
5786+
// return true if stop successful
5787+
return is_stopped_now;
5788+
}
5789+
5790+
bool Context::startSwarmMode()
5791+
{
5792+
bool expected_is_enabled = false;
5793+
bool is_started_now = shared->swarm_mode_enabled.compare_exchange_strong(expected_is_enabled, true);
5794+
if (is_started_now)
5795+
CurrentMetrics::set(CurrentMetrics::IsSwarmModeEnabled, 1);
5796+
// return true if start successful
5797+
return is_started_now;
5798+
}
5799+
5800+
bool Context::isSwarmModeEnabled() const
5801+
{
5802+
return shared->swarm_mode_enabled;
5803+
}
57635804

57645805
Context::ApplicationType Context::getApplicationType() const
57655806
{

src/Interpreters/Context.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,6 +1318,8 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
13181318
size_t getClustersVersion() const;
13191319

13201320
void startClusterDiscovery();
1321+
void registerInAutodiscoveryClusters();
1322+
void unregisterInAutodiscoveryClusters();
13211323

13221324
/// Sets custom cluster, but doesn't update configuration
13231325
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
@@ -1433,6 +1435,15 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
14331435

14341436
void shutdown();
14351437

1438+
/// Stop some works to allow graceful shutdown later.
1439+
/// Returns true if stop successful.
1440+
bool stopSwarmMode();
1441+
/// Resume some works if we change our mind.
1442+
/// Returns true if start successful.
1443+
bool startSwarmMode();
1444+
/// Return current swarm mode state.
1445+
bool isSwarmModeEnabled() const;
1446+
14361447
bool isInternalQuery() const { return is_internal_query; }
14371448
void setInternalQuery(bool internal) { is_internal_query = internal; }
14381449

src/Interpreters/InterpreterSystemQuery.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,20 @@ BlockIO InterpreterSystemQuery::execute()
702702
case Type::START_MOVES:
703703
startStopAction(ActionLocks::PartsMove, true);
704704
break;
705+
case Type::STOP_SWARM_MODE:
706+
{
707+
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
708+
if (getContext()->stopSwarmMode())
709+
getContext()->unregisterInAutodiscoveryClusters();
710+
break;
711+
}
712+
case Type::START_SWARM_MODE:
713+
{
714+
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
715+
if (getContext()->startSwarmMode())
716+
getContext()->registerInAutodiscoveryClusters();
717+
break;
718+
}
705719
case Type::STOP_FETCHES:
706720
startStopAction(ActionLocks::PartsFetch, false);
707721
break;
@@ -1643,6 +1657,12 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
16431657
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
16441658
break;
16451659
}
1660+
case Type::STOP_SWARM_MODE:
1661+
case Type::START_SWARM_MODE:
1662+
{
1663+
required_access.emplace_back(AccessType::SYSTEM_SWARM);
1664+
break;
1665+
}
16461666
case Type::STOP_PULLING_REPLICATION_LOG:
16471667
case Type::START_PULLING_REPLICATION_LOG:
16481668
{

src/Parsers/ASTSystemQuery.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
510510
case Type::DROP_PAGE_CACHE:
511511
case Type::STOP_REPLICATED_DDL_QUERIES:
512512
case Type::START_REPLICATED_DDL_QUERIES:
513+
case Type::STOP_SWARM_MODE:
514+
case Type::START_SWARM_MODE:
513515
break;
514516
case Type::UNKNOWN:
515517
case Type::END:

0 commit comments

Comments
 (0)