Skip to content

Commit 515cf56

Browse files
authored
Merge branch 'antalya-25.8' into frontport/antalya-25.8/fix_remote_calls
2 parents 2fea358 + 3a9c065 commit 515cf56

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1100
-85
lines changed

docs/en/sql-reference/statements/system.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name]
206206
207207
Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`)
208208
209+
## PRESHUTDOWN {#preshutdown}
210+
211+
<CloudNotSupportedBadge/>
212+
213+
Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).
214+
209215
## KILL {#kill}
210216
211217
Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)

programs/server/Server.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2329,6 +2329,8 @@ try
23292329

23302330
}
23312331

2332+
global_context->startSwarmMode();
2333+
23322334
{
23332335
std::lock_guard lock(servers_lock);
23342336
/// We should start interserver communications before (and more important shutdown after) tables.
@@ -2777,6 +2779,8 @@ try
27772779

27782780
is_cancelled = true;
27792781

2782+
global_context->stopSwarmMode();
2783+
27802784
LOG_DEBUG(log, "Waiting for current connections to close.");
27812785

27822786
size_t current_connections = 0;

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ enum class AccessType : uint8_t
334334
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
335335
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
336336
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
337+
M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \
337338
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
338339
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
339340
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \

src/Common/CurrentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@
432432
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
433433
\
434434
M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \
435+
M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \
435436
\
436437
M(StatelessWorkerThreads, "Number of threads in the stateless worker thread pool.") \
437438
M(StatelessWorkerThreadsActive, "Number of threads in the stateless worker thread pool running a task.") \

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/Core/Settings.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7026,6 +7026,19 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
70267026
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
70277027
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
70287028
)", 0) \
7029+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"(
7030+
In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active.
7031+
Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes.
7032+
7033+
Possible values:
7034+
7035+
- 0 - steal tasks immediately after freeing up.
7036+
- >0 - wait for specified period of time before stealing tasks.
7037+
7038+
Having this `>0` helps with cache reuse and might improve overall query time.
7039+
Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3.
7040+
Which might take longer than just waiting for the busy node and generate extra traffic.
7041+
)", EXPERIMENTAL) \
70297042
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
70307043
Force specified kind of Exchange operators between distributed query stages.
70317044
@@ -7049,6 +7062,9 @@ DECLARE(Bool, allow_experimental_ytsaurus_dictionary_source, false, R"(
70497062
)", EXPERIMENTAL) \
70507063
DECLARE(Bool, distributed_plan_force_shuffle_aggregation, false, R"(
70517064
Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distributed query plan.
7065+
)", EXPERIMENTAL) \
7066+
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
7067+
Allow retries in cluster request, when one node goes offline
70527068
)", EXPERIMENTAL) \
70537069
\
70547070
/** Experimental timeSeries* aggregate functions. */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4242
addSettingsChanges(settings_changes_history, "25.8.9.2000",
4343
{
4444
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
45+
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
4546
{"object_storage_cluster", "", "", "Antalya: New setting"},
4647
{"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
48+
{"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"},
4749
});
4850
addSettingsChanges(settings_changes_history, "25.8",
4951
{
@@ -138,6 +140,13 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
138140
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
139141
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
140142
/// RELEASE CLOSED
143+
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
144+
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
145+
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
146+
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
147+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
148+
{"object_storage_cluster", "", "", "New setting"},
149+
{"object_storage_max_nodes", 0, 0, "New setting"},
141150
});
142151
addSettingsChanges(settings_changes_history, "25.6.5.2000",
143152
{

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
#include <Poco/JSON/Object.h>
12+
#include <Poco/JSON/Parser.h>
13+
#include <Poco/JSON/JSONException.h>
14+
1115

1216
namespace DB
1317
{
@@ -104,4 +108,36 @@ std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const
104108
return getPath();
105109
}
106110

111+
RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
112+
{
113+
Poco::JSON::Parser parser;
114+
try
115+
{
116+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
117+
if (!json)
118+
return;
119+
120+
successfully_parsed = true;
121+
122+
if (json->has("retry_after_us"))
123+
retry_after_us = json->getValue<size_t>("retry_after_us");
124+
}
125+
catch (const Poco::JSON::JSONException &)
126+
{ /// Not a JSON
127+
return;
128+
}
129+
}
130+
131+
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
132+
{
133+
Poco::JSON::Object json;
134+
if (retry_after_us.has_value())
135+
json.set("retry_after_us", retry_after_us.value());
136+
137+
std::ostringstream oss;
138+
oss.exceptions(std::ios::failbit);
139+
Poco::JSON::Stringifier::stringify(json, oss);
140+
return oss.str();
141+
}
142+
107143
}

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,41 @@ struct DataLakeObjectMetadata;
111111

112112
struct RelativePathWithMetadata
113113
{
114+
class CommandInTaskResponse
115+
{
116+
public:
117+
CommandInTaskResponse() = default;
118+
explicit CommandInTaskResponse(const std::string & task);
119+
120+
bool is_parsed() const { return successfully_parsed; }
121+
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
122+
123+
std::string to_string() const;
124+
125+
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
126+
127+
private:
128+
bool successfully_parsed = false;
129+
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
130+
};
131+
114132
String relative_path;
115133
/// Object metadata: size, modification time, etc.
116134
std::optional<ObjectMetadata> metadata;
117135
/// Delta lake related object metadata.
118136
std::optional<DataLakeObjectMetadata> data_lake_metadata;
137+
/// Retry request after short pause
138+
CommandInTaskResponse command;
119139

120140
RelativePathWithMetadata() = default;
121141

122-
explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
123-
: relative_path(std::move(relative_path_))
124-
, metadata(std::move(metadata_))
125-
{}
142+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
143+
: metadata(std::move(metadata_))
144+
, command(task_string)
145+
{
146+
if (!command.is_parsed())
147+
relative_path = task_string;
148+
}
126149

127150
RelativePathWithMetadata(const RelativePathWithMetadata & other) = default;
128151

@@ -134,6 +157,8 @@ struct RelativePathWithMetadata
134157
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
135158
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
136159
virtual std::string getPathOrPathToArchiveIfArchive() const;
160+
161+
const CommandInTaskResponse & getCommand() const { return command; }
137162
};
138163

139164
struct ObjectKeyWithMetadata

src/Interpreters/ClusterDiscovery.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
108108
cv.notify_one();
109109
}
110110

111+
void wakeup()
112+
{
113+
std::unique_lock<std::mutex> lk(mu);
114+
any_need_update = true;
115+
cv.notify_one();
116+
}
117+
111118
private:
112119
std::condition_variable cv;
113120
std::mutex mu;
@@ -391,7 +398,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
391398
return true;
392399
};
393400

394-
if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name))
401+
if (!cluster_info.current_node_is_observer
402+
&& context->isSwarmModeEnabled()
403+
&& !contains(node_uuids, current_node_name))
395404
{
396405
LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name);
397406
registerInZk(zk, cluster_info);
@@ -455,12 +464,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
455464
return;
456465
}
457466

467+
if (!context->isSwarmModeEnabled())
468+
{
469+
LOG_DEBUG(log, "STOP SWARM MODE called, skip self-registering current node {} in cluster {}", current_node_name, info.name);
470+
return;
471+
}
472+
458473
LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);
459474

460475
zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
461476
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
462477
}
463478

479+
void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
480+
{
481+
if (info.current_node_is_observer)
482+
return;
483+
484+
String node_path = getShardsListPath(info.zk_root) / current_node_name;
485+
LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name);
486+
487+
zk->remove(node_path);
488+
LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name);
489+
}
490+
464491
void ClusterDiscovery::initialUpdate()
465492
{
466493
LOG_DEBUG(log, "Initializing");
@@ -506,6 +533,18 @@ void ClusterDiscovery::initialUpdate()
506533
is_initialized = true;
507534
}
508535

536+
void ClusterDiscovery::registerAll()
537+
{
538+
register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
539+
clusters_to_update->wakeup();
540+
}
541+
542+
void ClusterDiscovery::unregisterAll()
543+
{
544+
register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
545+
clusters_to_update->wakeup();
546+
}
547+
509548
void ClusterDiscovery::findDynamicClusters(
510549
std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
511550
std::unordered_set<size_t> * unchanged_roots)
@@ -729,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
729768
{
730769
up_to_date_callback();
731770
}
771+
772+
RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE);
773+
774+
if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
775+
{
776+
LOG_DEBUG(log, "Register in all dynamic clusters");
777+
for (auto & [_, info] : clusters_info)
778+
{
779+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
780+
registerInZk(zk, info);
781+
}
782+
}
783+
else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
784+
{
785+
LOG_DEBUG(log, "Unregister in all dynamic clusters");
786+
for (auto & [_, info] : clusters_info)
787+
{
788+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
789+
unregisterFromZk(zk, info);
790+
}
791+
}
732792
}
733793
LOG_DEBUG(log, "Worker thread stopped");
734794
return finished;

0 commit comments

Comments
 (0)