Skip to content

Commit f1cf71d

Browse files
committed
[WIP] Add stable file to node mapping in StorageObjectStorageCluster - add setting
1 parent 1cf5e4e commit f1cf71d

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,9 @@ Ignore absence of file if it does not exist when reading certain keys.
495495
Possible values:
496496
- 1 — `SELECT` returns empty result.
497497
- 0 — `SELECT` throws an exception.
498+
)", 0) \
499+
DECLARE(Bool, object_storage_stable_cluster_task_distribution, false, R"(
500+
Use stable task distribution for object storage cluster table functions in order to better utilize filesystem cache.
498501
)", 0) \
499502
DECLARE(Bool, hdfs_ignore_file_doesnt_exist, false, R"(
500503
Ignore absence of file if it does not exist when reading certain keys.

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace DB
1919
namespace Setting
2020
{
2121
extern const SettingsBool use_hive_partitioning;
22+
extern const SettingsBool object_storage_stable_cluster_task_distribution;
2223
}
2324

2425
namespace ErrorCodes
@@ -120,14 +121,29 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
120121
local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback());
121122

122123
auto cluster = getCluster(local_context);
123-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, cluster);
124124

125-
auto callback = std::make_shared<TaskIterator>(
126-
[task_distributor](Connection * connection) mutable -> String {
127-
return task_distributor->getNextTask(connection);
128-
});
129-
130-
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
125+
if (local_context->getSettingsRef()[Setting::object_storage_stable_cluster_task_distribution])
126+
{
127+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, cluster);
128+
129+
auto callback = std::make_shared<TaskIterator>(
130+
[task_distributor](Connection * connection) mutable -> String {
131+
return task_distributor->getNextTask(connection);
132+
});
133+
134+
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
135+
}
136+
else
137+
{
138+
auto callback = std::make_shared<TaskIterator>(
139+
[iterator](Connection *) mutable -> String {
140+
if (auto object_info = iterator->next(0))
141+
return object_info->getPath();
142+
return "";
143+
});
144+
145+
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
146+
}
131147
}
132148

133149
}

0 commit comments

Comments
 (0)