Skip to content

Commit 213dc0a

Browse files
authored
Merge pull request #1042 from Altinity/bugfix/antalya-25.6.5/lock_object_storage_task_distribution_ms_lost_host
Antalya 25.6: Fix lock_object_storage_task_distribution_ms, Changed lock_object_storage_task_distribution_ms value to 500
2 parents a1c4e5e + d9f8a8d commit 213dc0a

File tree

4 files changed

+24
-7
lines changed

4 files changed

+24
-7
lines changed

src/Core/Settings.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6891,8 +6891,18 @@ Cache the list of objects returned by list objects calls in object storage
68916891
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
68926892
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
68936893
)", 0) \
6894-
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
6895-
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
6894+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 500, R"(
6895+
In object storage distribution queries do not distribute tasks on non-prefetched nodes until prefetched node is active.
6896+
Determines how long the free executor node (one that finished processing all of it assigned tasks) should wait before "stealing" tasks from queue of currently busy executor nodes.
6897+
6898+
Possible values:
6899+
6900+
- 0 - steal tasks immediately after freeing up.
6901+
- >0 - wait for specified period of time before stealing tasks.
6902+
6903+
Having this `>0` helps with cache reuse and might improve overall query time.
6904+
Because busy node might have warmed-up caches for this specific task, while free node needs to fetch lots of data from S3.
6905+
Which might take longer than just waiting for the busy node and generate extra traffic.
68966906
)", EXPERIMENTAL) \
68976907
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
68986908
Force specified kind of Exchange operators between distributed query stages.

src/Core/SettingsChangesHistory.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
6767
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
6868
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
6969
/// Note: please check if the key already exists to prevent duplicate entries.
70-
addSettingsChanges(settings_changes_history, "25.6.5.2000",
70+
addSettingsChanges(settings_changes_history, "25.6.5.20364",
71+
{
72+
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
73+
});
74+
addSettingsChanges(settings_changes_history, "25.6.5.20000",
7175
{
7276
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
7377
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
195195
// Queue file for its assigned replica
196196
{
197197
std::lock_guard lock(mutex);
198-
unprocessed_files[file_path] = number_of_current_replica;
198+
unprocessed_files[file_path] = file_replica_idx;
199199
connection_to_files[file_replica_idx].push_back(file_path);
200200
}
201201
}
@@ -286,7 +286,11 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
286286

287287
replica_to_files_to_be_processed.erase(number_of_current_replica);
288288
for (const auto & file_path : processed_file_list_ptr->second)
289-
unprocessed_files[file_path] = getReplicaForFile(file_path);
289+
{
290+
auto file_replica_idx = getReplicaForFile(file_path);
291+
unprocessed_files[file_path] = file_replica_idx;
292+
connection_to_files[file_replica_idx].push_back(file_path);
293+
}
290294
}
291295

292296
}

tests/integration/test_s3_cache_locality/test.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
8686
"filesystem_cache_name": "'raw_s3_cache'",
8787
}
8888

89-
if lock_object_storage_task_distribution_ms > 0:
90-
settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms
89+
settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms
9190

9291
query_id_first = str(uuid.uuid4())
9392
result_first = node.query(

0 commit comments

Comments
 (0)