Skip to content

Commit 6777073

Browse files
committed
Merge branch 'antalya-25.8' into frontport/antalya-25.8/optimize_count_in_datalake
2 parents 016aac6 + 3a9c065 commit 6777073

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1102
-81
lines changed

docs/en/sql-reference/statements/system.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name]
206206
207207
Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`)
208208
209+
## PRESHUTDOWN {#preshutdown}
210+
211+
<CloudNotSupportedBadge/>
212+
213+
Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).
214+
209215
## KILL {#kill}
210216
211217
Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)

programs/server/Server.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2329,6 +2329,8 @@ try
23292329

23302330
}
23312331

2332+
global_context->startSwarmMode();
2333+
23322334
{
23332335
std::lock_guard lock(servers_lock);
23342336
/// We should start interserver communications before (and more important shutdown after) tables.
@@ -2777,6 +2779,8 @@ try
27772779

27782780
is_cancelled = true;
27792781

2782+
global_context->stopSwarmMode();
2783+
27802784
LOG_DEBUG(log, "Waiting for current connections to close.");
27812785

27822786
size_t current_connections = 0;

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ enum class AccessType : uint8_t
334334
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
335335
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
336336
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
337+
M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \
337338
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
338339
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
339340
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \

src/Common/CurrentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@
432432
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
433433
\
434434
M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \
435+
M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \
435436
\
436437
M(StatelessWorkerThreads, "Number of threads in the stateless worker thread pool.") \
437438
M(StatelessWorkerThreadsActive, "Number of threads in the stateless worker thread pool running a task.") \

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/Core/Settings.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7010,6 +7010,19 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
70107010
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
70117011
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
70127012
)", 0) \
7013+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"(
7014+
In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active.
7015+
Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes.
7016+
7017+
Possible values:
7018+
7019+
- 0 - steal tasks immediately after freeing up.
7020+
- >0 - wait for specified period of time before stealing tasks.
7021+
7022+
Having this `>0` helps with cache reuse and might improve overall query time.
7023+
Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3.
7024+
Which might take longer than just waiting for the busy node and generate extra traffic.
7025+
)", EXPERIMENTAL) \
70137026
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
70147027
Force specified kind of Exchange operators between distributed query stages.
70157028
@@ -7036,6 +7049,9 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri
70367049
)", EXPERIMENTAL) \
70377050
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
70387051
Allow Iceberg read optimization based on Iceberg metadata.
7052+
)", EXPERIMENTAL) \
7053+
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
7054+
Allow retries in cluster request, when one node goes offline
70397055
)", EXPERIMENTAL) \
70407056
\
70417057
/** Experimental timeSeries* aggregate functions. */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4242
addSettingsChanges(settings_changes_history, "25.8.9.2000",
4343
{
4444
{"allow_experimental_iceberg_read_optimization", true, true, "New setting."},
45+
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
4546
{"object_storage_cluster", "", "", "Antalya: New setting"},
4647
{"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
48+
{"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"},
4749
});
4850
addSettingsChanges(settings_changes_history, "25.8",
4951
{
@@ -138,6 +140,13 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
138140
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
139141
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
140142
/// RELEASE CLOSED
143+
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
144+
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
145+
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
146+
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
147+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
148+
{"object_storage_cluster", "", "", "New setting"},
149+
{"object_storage_max_nodes", 0, 0, "New setting"},
141150
});
142151
addSettingsChanges(settings_changes_history, "25.6.5.2000",
143152
{

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99
#include <Common/ObjectStorageKeyGenerator.h>
1010
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
1111

12+
#include <Poco/JSON/Object.h>
13+
#include <Poco/JSON/Parser.h>
14+
#include <Poco/JSON/JSONException.h>
15+
16+
1217
namespace DB
1318
{
1419

@@ -111,4 +116,36 @@ std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const
111116
return getPath();
112117
}
113118

119+
RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
120+
{
121+
Poco::JSON::Parser parser;
122+
try
123+
{
124+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
125+
if (!json)
126+
return;
127+
128+
successfully_parsed = true;
129+
130+
if (json->has("retry_after_us"))
131+
retry_after_us = json->getValue<size_t>("retry_after_us");
132+
}
133+
catch (const Poco::JSON::JSONException &)
134+
{ /// Not a JSON
135+
return;
136+
}
137+
}
138+
139+
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
140+
{
141+
Poco::JSON::Object json;
142+
if (retry_after_us.has_value())
143+
json.set("retry_after_us", retry_after_us.value());
144+
145+
std::ostringstream oss;
146+
oss.exceptions(std::ios::failbit);
147+
Poco::JSON::Stringifier::stringify(json, oss);
148+
return oss.str();
149+
}
150+
114151
}

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,33 @@ struct DataLakeObjectMetadata;
116116

117117
struct RelativePathWithMetadata
118118
{
119+
class CommandInTaskResponse
120+
{
121+
public:
122+
CommandInTaskResponse() = default;
123+
explicit CommandInTaskResponse(const std::string & task);
124+
125+
bool is_parsed() const { return successfully_parsed; }
126+
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
127+
128+
std::string to_string() const;
129+
130+
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
131+
132+
private:
133+
bool successfully_parsed = false;
134+
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
135+
};
136+
119137
String relative_path;
120138
/// Object metadata: size, modification time, etc.
121139
std::optional<ObjectMetadata> metadata;
122140
/// Delta lake related object metadata.
123141
std::optional<DataLakeObjectMetadata> data_lake_metadata;
124142
/// Information about columns
125143
std::optional<DataFileMetaInfoPtr> file_meta_info;
144+
/// Retry request after short pause
145+
CommandInTaskResponse command;
126146

127147
RelativePathWithMetadata() = default;
128148

@@ -132,6 +152,14 @@ struct RelativePathWithMetadata
132152
{}
133153
explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_ = std::nullopt);
134154

155+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
156+
: metadata(std::move(metadata_))
157+
, command(task_string)
158+
{
159+
if (!command.is_parsed())
160+
relative_path = task_string;
161+
}
162+
135163
RelativePathWithMetadata(const RelativePathWithMetadata & other) = default;
136164

137165
virtual ~RelativePathWithMetadata() = default;
@@ -145,6 +173,8 @@ struct RelativePathWithMetadata
145173

146174
void setFileMetaInfo(std::optional<DataFileMetaInfoPtr> file_meta_info_ ) { file_meta_info = file_meta_info_; }
147175
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }
176+
177+
const CommandInTaskResponse & getCommand() const { return command; }
148178
};
149179

150180
struct ObjectKeyWithMetadata

src/Interpreters/ClusterDiscovery.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ class ClusterDiscovery::Flags
108108
cv.notify_one();
109109
}
110110

111+
void wakeup()
112+
{
113+
std::unique_lock<std::mutex> lk(mu);
114+
any_need_update = true;
115+
cv.notify_one();
116+
}
117+
111118
private:
112119
std::condition_variable cv;
113120
std::mutex mu;
@@ -391,7 +398,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
391398
return true;
392399
};
393400

394-
if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name))
401+
if (!cluster_info.current_node_is_observer
402+
&& context->isSwarmModeEnabled()
403+
&& !contains(node_uuids, current_node_name))
395404
{
396405
LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name);
397406
registerInZk(zk, cluster_info);
@@ -455,12 +464,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
455464
return;
456465
}
457466

467+
if (!context->isSwarmModeEnabled())
468+
{
469+
LOG_DEBUG(log, "STOP SWARM MODE called, skip self-registering current node {} in cluster {}", current_node_name, info.name);
470+
return;
471+
}
472+
458473
LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);
459474

460475
zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
461476
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
462477
}
463478

479+
void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
480+
{
481+
if (info.current_node_is_observer)
482+
return;
483+
484+
String node_path = getShardsListPath(info.zk_root) / current_node_name;
485+
LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name);
486+
487+
zk->remove(node_path);
488+
LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name);
489+
}
490+
464491
void ClusterDiscovery::initialUpdate()
465492
{
466493
LOG_DEBUG(log, "Initializing");
@@ -506,6 +533,18 @@ void ClusterDiscovery::initialUpdate()
506533
is_initialized = true;
507534
}
508535

536+
void ClusterDiscovery::registerAll()
537+
{
538+
register_change_flag = RegisterChangeFlag::RCF_REGISTER_ALL;
539+
clusters_to_update->wakeup();
540+
}
541+
542+
void ClusterDiscovery::unregisterAll()
543+
{
544+
register_change_flag = RegisterChangeFlag::RCF_UNREGISTER_ALL;
545+
clusters_to_update->wakeup();
546+
}
547+
509548
void ClusterDiscovery::findDynamicClusters(
510549
std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
511550
std::unordered_set<size_t> * unchanged_roots)
@@ -729,6 +768,27 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
729768
{
730769
up_to_date_callback();
731770
}
771+
772+
RegisterChangeFlag flag = register_change_flag.exchange(RegisterChangeFlag::RCF_NONE);
773+
774+
if (flag == RegisterChangeFlag::RCF_REGISTER_ALL)
775+
{
776+
LOG_DEBUG(log, "Register in all dynamic clusters");
777+
for (auto & [_, info] : clusters_info)
778+
{
779+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
780+
registerInZk(zk, info);
781+
}
782+
}
783+
else if (flag == RegisterChangeFlag::RCF_UNREGISTER_ALL)
784+
{
785+
LOG_DEBUG(log, "Unregister in all dynamic clusters");
786+
for (auto & [_, info] : clusters_info)
787+
{
788+
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
789+
unregisterFromZk(zk, info);
790+
}
791+
}
732792
}
733793
LOG_DEBUG(log, "Worker thread stopped");
734794
return finished;

0 commit comments

Comments
 (0)