Skip to content

Commit 05010e8

Browse files
authored
Merge pull request #1414 from Altinity/frontport/antalya-26.1/rendezvous_hashing
26.1 Antalya port - improvements for cluster requests
2 parents 18ac1a4 + 7e9b163 commit 05010e8

File tree

69 files changed

+1996
-108
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+1996
-108
lines changed

docs/en/sql-reference/statements/system.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$
220220
221221
Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)
222222
223+
## SYSTEM PRESHUTDOWN {#preshutdown}
224+
225+
<CloudNotSupportedBadge/>
226+
227+
Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).
228+
223229
## SYSTEM INSTRUMENT {#instrument}
224230
225231
Manages instrumentation points using LLVM's XRay feature which is available when ClickHouse is built using `ENABLE_XRAY=1`.

programs/server/Server.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2570,6 +2570,8 @@ try
25702570

25712571
}
25722572

2573+
global_context->startSwarmMode();
2574+
25732575
{
25742576
std::lock_guard lock(servers_lock);
25752577
/// We should start interserver communications before (and more important shutdown after) tables.
@@ -3042,6 +3044,8 @@ try
30423044

30433045
is_cancelled = true;
30443046

3047+
global_context->stopSwarmMode();
3048+
30453049
LOG_DEBUG(log, "Waiting for current connections to close.");
30463050

30473051
size_t current_connections = 0;

src/Access/Common/AccessType.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ enum class AccessType : uint8_t
353353
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
354354
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
355355
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
356+
M(SYSTEM_SWARM, "SYSTEM STOP SWARM MODE, SYSTEM START SWARM MODE, STOP SWARM MODE, START SWARM MODE", GLOBAL, SYSTEM) \
356357
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
357358
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
358359
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \

src/Client/MultiplexedConnections.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuid
232232
void MultiplexedConnections::sendClusterFunctionReadTaskResponse(const ClusterFunctionReadTaskResponse & response)
233233
{
234234
std::lock_guard lock(cancel_mutex);
235-
if (cancelled)
235+
if (cancelled || !current_connection || !current_connection->isConnected())
236236
return;
237237
current_connection->sendClusterFunctionReadTaskResponse(response);
238238
}
@@ -241,7 +241,7 @@ void MultiplexedConnections::sendClusterFunctionReadTaskResponse(const ClusterFu
241241
void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response)
242242
{
243243
std::lock_guard lock(cancel_mutex);
244-
if (cancelled)
244+
if (cancelled || !current_connection || !current_connection->isConnected())
245245
return;
246246
current_connection->sendMergeTreeReadTaskResponse(response);
247247
}
@@ -527,9 +527,12 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
527527

528528
void MultiplexedConnections::invalidateReplica(ReplicaState & state)
529529
{
530+
Connection * old_connection = state.connection;
530531
state.connection = nullptr;
531532
state.pool_entry = IConnectionPool::Entry();
532533
--active_connection_count;
534+
if (current_connection == old_connection)
535+
current_connection = nullptr;
533536
}
534537

535538
void MultiplexedConnections::setAsyncCallback(AsyncCallback async_callback)

src/Common/CurrentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@
461461
M(StartupScriptsExecutionState, "State of startup scripts execution: 0 = not finished, 1 = success, 2 = failure.") \
462462
\
463463
M(IsServerShuttingDown, "Indicates if the server is shutting down: 0 = no, 1 = yes") \
464+
M(IsSwarmModeEnabled, "Indicates if the swarm mode enabled or not: 0 = disabled, 1 = enabled") \
464465
\
465466
M(StatelessWorkerThreads, "Number of threads in the stateless worker thread pool.") \
466467
M(StatelessWorkerThreadsActive, "Number of threads in the stateless worker thread pool running a task.") \

src/Common/ProfileEvents.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1298,8 +1298,13 @@ The server successfully detected this situation and will download merged part fr
12981298
M(RuntimeFilterRowsChecked, "Number of rows checked by JOIN Runtime Filters", ValueType::Number) \
12991299
M(RuntimeFilterRowsPassed, "Number of rows that passed (not filtered out by) JOIN Runtime Filters", ValueType::Number) \
13001300
M(RuntimeFilterRowsSkipped, "Number of rows in blocks that were skipped by JOIN Runtime Filters", ValueType::Number) \
1301+
\
1302+
M(ObjectStorageClusterSentToMatchedReplica, "Number of tasks in ObjectStorageCluster request sent to matched replica.", ValueType::Number) \
1303+
M(ObjectStorageClusterSentToNonMatchedReplica, "Number of tasks in ObjectStorageCluster request sent to non-matched replica.", ValueType::Number) \
1304+
M(ObjectStorageClusterProcessedTasks, "Number of processed tasks in ObjectStorageCluster request.", ValueType::Number) \
1305+
M(ObjectStorageClusterWaitingMicroseconds, "Time of waiting for tasks in ObjectStorageCluster request.", ValueType::Microseconds) \
13011306
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
1302-
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
1307+
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
13031308
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
13041309
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \
13051310
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/Core/Range.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,21 @@
22
#include <Core/Range.h>
33
#include <IO/Operators.h>
44
#include <IO/WriteBufferFromString.h>
5+
#include <IO/ReadBufferFromString.h>
56
#include <Common/FieldVisitorToString.h>
67
#include <Common/FieldAccurateComparison.h>
8+
#include <Common/Base64.h>
79

810

911
namespace DB
1012
{
1113

14+
namespace ErrorCodes
15+
{
16+
extern const int INCORRECT_DATA;
17+
};
18+
19+
1220
FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_)
1321
: Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_)
1422
{
@@ -151,6 +159,13 @@ bool Range::isInfinite() const
151159
return left.isNegativeInfinity() && right.isPositiveInfinity();
152160
}
153161

162+
/// [x, x]
163+
bool Range::isPoint() const
164+
{
165+
return fullBounded() && left_included && right_included && equals(left, right)
166+
&& !left.isNegativeInfinity() && !left.isPositiveInfinity();
167+
}
168+
154169
bool Range::intersectsRange(const Range & r) const
155170
{
156171
/// r to the left of me.
@@ -276,6 +291,32 @@ bool Range::nearByWith(const Range & r) const
276291
return false;
277292
}
278293

294+
String Range::serialize(bool base64) const
295+
{
296+
WriteBufferFromOwnString str;
297+
298+
str << left_included << right_included;
299+
writeFieldBinary(left, str);
300+
writeFieldBinary(right, str);
301+
302+
if (base64)
303+
return base64Encode(str.str());
304+
else
305+
return str.str();
306+
}
307+
308+
void Range::deserialize(const String & range, bool base64)
309+
{
310+
if (range.empty())
311+
throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump");
312+
313+
ReadBufferFromOwnString str(base64 ? base64Decode(range) : range);
314+
315+
str >> left_included >> right_included;
316+
left = readFieldBinary(str);
317+
right = readFieldBinary(str);
318+
}
319+
279320
Range intersect(const Range & a, const Range & b)
280321
{
281322
Range res = Range::createWholeUniverse();

src/Core/Range.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ struct Range
9494

9595
bool isBlank() const;
9696

97+
bool isPoint() const;
98+
9799
bool intersectsRange(const Range & r) const;
98100

99101
bool containsRange(const Range & r) const;
@@ -114,6 +116,9 @@ struct Range
114116
bool nearByWith(const Range & r) const;
115117

116118
String toString() const;
119+
120+
String serialize(bool base64 = false) const;
121+
void deserialize(const String & range, bool base64 = false);
117122
};
118123

119124
Range intersect(const Range & a, const Range & b);

src/Core/Settings.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7544,6 +7544,19 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
75447544
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
75457545
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
75467546
)", 0) \
7547+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"(
7548+
In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active.
7549+
Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes.
7550+
7551+
Possible values:
7552+
7553+
- 0 - steal tasks immediately after freeing up.
7554+
- >0 - wait for specified period of time before stealing tasks.
7555+
7556+
Having this `>0` helps with cache reuse and might improve overall query time.
7557+
Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3.
7558+
Which might take longer than just waiting for the busy node and generate extra traffic.
7559+
)", EXPERIMENTAL) \
75477560
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
75487561
Force specified kind of Exchange operators between distributed query stages.
75497562
@@ -7591,6 +7604,9 @@ If the number of set bits in a runtime bloom filter exceeds this ratio the filte
75917604
)", EXPERIMENTAL) \
75927605
DECLARE(Bool, rewrite_in_to_join, false, R"(
75937606
Rewrite expressions like 'x IN subquery' to JOIN. This might be useful for optimizing the whole query with join reordering.
7607+
)", EXPERIMENTAL) \
7608+
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
7609+
Allow Iceberg read optimization based on Iceberg metadata.
75947610
)", EXPERIMENTAL) \
75957611
\
75967612
/** Experimental timeSeries* aggregate functions. */ \
@@ -7743,7 +7759,8 @@ Allow experimental database engine DataLakeCatalog with catalog_type = 'paimon_r
77437759
MAKE_OBSOLETE(M, Bool, use_json_alias_for_old_object_type, false) \
77447760
MAKE_OBSOLETE(M, Bool, describe_extend_object_types, false) \
77457761
MAKE_OBSOLETE(M, Bool, allow_experimental_object_type, false) \
7746-
MAKE_OBSOLETE(M, BoolAuto, insert_select_deduplicate, Field{"auto"})
7762+
MAKE_OBSOLETE(M, BoolAuto, insert_select_deduplicate, Field{"auto"}) \
7763+
MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \
77477764
/** The section above is for obsolete settings. Do not add anything there. */
77487765
#endif /// __CLION_IDE__
77497766

0 commit comments

Comments
 (0)