Skip to content

Commit 477887a

Browse files
authored
Merge pull request ClickHouse#77326 from BetterStackHQ/ah/stable-s3-filesystem-cache
Change object storage cluster table functions to prefer specific replicas to improve cache locality
2 parents 73c3d18 + 7d8ef02 commit 477887a

14 files changed

+248
-35
lines changed

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,8 +748,12 @@ void RemoteQueryExecutor::processReadTaskRequest()
748748
if (!extension || !extension->task_iterator)
749749
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
750750

751+
if (!extension->replica_info)
752+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized");
753+
751754
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
752-
auto response = (*extension->task_iterator)();
755+
756+
auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica);
753757
connections->sendReadTaskResponse(response);
754758
}
755759

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext;
2828
class ParallelReplicasReadingCoordinator;
2929

3030
/// This is the same type as StorageS3Source::IteratorWrapper
31-
using TaskIterator = std::function<String()>;
31+
using TaskIterator = std::function<String(size_t)>;
3232

3333
/// This class allows one to launch queries on remote replicas of one shard and get results
3434
class RemoteQueryExecutor

src/Storages/IStorageCluster.cpp

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class ReadFromCluster : public SourceStepWithFilter
9393

9494
std::optional<RemoteQueryExecutor::Extension> extension;
9595

96-
void createExtension(const ActionsDAG::Node * predicate);
96+
void createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas);
9797
ContextPtr updateSettings(const Settings & settings);
9898
};
9999

@@ -105,15 +105,19 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
105105
if (filter_actions_dag)
106106
predicate = filter_actions_dag->getOutputs().at(0);
107107

108-
createExtension(predicate);
108+
auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
109+
if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1)
110+
max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value);
111+
112+
createExtension(predicate, max_replicas_to_use);
109113
}
110114

111-
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
115+
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas)
112116
{
113117
if (extension)
114118
return;
115119

116-
extension = storage->getTaskIteratorExtension(predicate, context);
120+
extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
117121
}
118122

119123
/// The code executes on initiator
@@ -179,8 +183,6 @@ void IStorageCluster::read(
179183

180184
void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
181185
{
182-
createExtension(nullptr);
183-
184186
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
185187
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
186188

@@ -189,10 +191,13 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
189191
const auto & current_settings = new_context->getSettingsRef();
190192
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
191193

194+
size_t replica_index = 0;
192195
auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
193196
if (current_settings[Setting::max_parallel_replicas] > 1)
194197
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);
195198

199+
createExtension(nullptr, max_replicas_to_use);
200+
196201
for (const auto & shard_info : cluster->getShardsInfo())
197202
{
198203
if (pipes.size() >= max_replicas_to_use)
@@ -210,6 +215,8 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
210215
if (try_results.empty())
211216
continue;
212217

218+
IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };
219+
213220
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
214221
std::vector<IConnectionPool::Entry>{try_results.front()},
215222
query_to_send->formatWithSecretsOneLine(),
@@ -220,7 +227,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
220227
Tables(),
221228
processed_stage,
222229
nullptr,
223-
extension);
230+
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)});
224231

225232
remote_query_executor->setLogger(log);
226233
pipes.emplace_back(std::make_shared<RemoteSource>(

src/Storages/IStorageCluster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class IStorageCluster : public IStorage
3535

3636
ClusterPtr getCluster(ContextPtr context) const;
3737
/// Query is needed for pruning by virtual columns (_file, _path)
38-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;
38+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
3939

4040
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4141

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <Storages/ObjectStorage/Utils.h>
1414
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
1515
#include <Storages/extractTableFunctionFromSelectQuery.h>
16+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
1617

1718

1819
namespace DB
@@ -145,24 +146,19 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
145146
}
146147

147148
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
148-
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
149+
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
149150
{
150151
auto iterator = StorageObjectStorageSource::createFileIterator(
151152
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
152153
local_context, predicate, {}, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true);
153154

154-
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
155-
{
156-
auto object_info = iterator->next(0);
157-
if (!object_info)
158-
return "";
155+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
159156

160-
auto archive_object_info = std::dynamic_pointer_cast<StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive>(object_info);
161-
if (archive_object_info)
162-
return archive_object_info->getPathToArchive();
157+
auto callback = std::make_shared<TaskIterator>(
158+
[task_distributor](size_t number_of_current_replica) mutable -> String {
159+
return task_distributor->getNextTask(number_of_current_replica).value_or("");
160+
});
163161

164-
return object_info->getPath();
165-
});
166162
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
167163
}
168164

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class StorageObjectStorageCluster : public IStorageCluster
2525
std::string getName() const override;
2626

2727
RemoteQueryExecutor::Extension getTaskIteratorExtension(
28-
const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
28+
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
2929

3030
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
3131

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#include "StorageObjectStorageStableTaskDistributor.h"
2+
#include <Common/SipHash.h>
3+
#include <consistent_hashing.h>
4+
#include <optional>
5+
6+
namespace DB
7+
{
8+
9+
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
10+
std::shared_ptr<IObjectIterator> iterator_,
11+
size_t number_of_replicas_)
12+
: iterator(std::move(iterator_))
13+
, connection_to_files(number_of_replicas_)
14+
, iterator_exhausted(false)
15+
{
16+
}
17+
18+
std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
19+
{
20+
LOG_TRACE(
21+
log,
22+
"Received a new connection from replica {} looking for a file",
23+
number_of_current_replica
24+
);
25+
26+
// 1. Check pre-queued files first
27+
if (auto file = getPreQueuedFile(number_of_current_replica))
28+
return file;
29+
30+
// 2. Try to find a matching file from the iterator
31+
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
32+
return file;
33+
34+
// 3. Process unprocessed files if iterator is exhausted
35+
return getAnyUnprocessedFile(number_of_current_replica);
36+
}
37+
38+
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
39+
{
40+
return ConsistentHashing(sipHash64(file_path), connection_to_files.size());
41+
}
42+
43+
std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica)
44+
{
45+
std::lock_guard lock(mutex);
46+
47+
auto & files = connection_to_files[number_of_current_replica];
48+
49+
while (!files.empty())
50+
{
51+
String next_file = files.back();
52+
files.pop_back();
53+
54+
auto it = unprocessed_files.find(next_file);
55+
if (it == unprocessed_files.end())
56+
continue;
57+
58+
unprocessed_files.erase(it);
59+
60+
LOG_TRACE(
61+
log,
62+
"Assigning pre-queued file {} to replica {}",
63+
next_file,
64+
number_of_current_replica
65+
);
66+
67+
return next_file;
68+
}
69+
70+
return std::nullopt;
71+
}
72+
73+
std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica)
74+
{
75+
{
76+
std::lock_guard lock(mutex);
77+
if (iterator_exhausted)
78+
return std::nullopt;
79+
}
80+
81+
while (true)
82+
{
83+
ObjectInfoPtr object_info;
84+
85+
{
86+
std::lock_guard lock(mutex);
87+
object_info = iterator->next(0);
88+
89+
if (!object_info)
90+
{
91+
iterator_exhausted = true;
92+
break;
93+
}
94+
}
95+
96+
String file_path;
97+
98+
auto archive_object_info = std::dynamic_pointer_cast<StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive>(object_info);
99+
if (archive_object_info)
100+
{
101+
file_path = archive_object_info->getPathToArchive();
102+
}
103+
else
104+
{
105+
file_path = object_info->getPath();
106+
}
107+
108+
size_t file_replica_idx = getReplicaForFile(file_path);
109+
if (file_replica_idx == number_of_current_replica)
110+
{
111+
LOG_TRACE(
112+
log,
113+
"Found file {} for replica {}",
114+
file_path,
115+
number_of_current_replica
116+
);
117+
118+
return file_path;
119+
}
120+
121+
// Queue file for its assigned replica
122+
{
123+
std::lock_guard lock(mutex);
124+
unprocessed_files.insert(file_path);
125+
connection_to_files[file_replica_idx].push_back(file_path);
126+
}
127+
}
128+
129+
return std::nullopt;
130+
}
131+
132+
std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
133+
{
134+
std::lock_guard lock(mutex);
135+
136+
if (!unprocessed_files.empty())
137+
{
138+
auto it = unprocessed_files.begin();
139+
String next_file = *it;
140+
unprocessed_files.erase(it);
141+
142+
LOG_TRACE(
143+
log,
144+
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
145+
next_file,
146+
number_of_current_replica
147+
);
148+
149+
return next_file;
150+
}
151+
152+
return std::nullopt;
153+
}
154+
155+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#pragma once
2+
3+
#include <Client/Connection.h>
4+
#include <Common/Logger.h>
5+
#include <Interpreters/Cluster.h>
6+
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
7+
#include <unordered_set>
8+
#include <vector>
9+
#include <mutex>
10+
#include <memory>
11+
12+
namespace DB
13+
{
14+
15+
class StorageObjectStorageStableTaskDistributor
16+
{
17+
public:
18+
StorageObjectStorageStableTaskDistributor(
19+
std::shared_ptr<IObjectIterator> iterator_,
20+
size_t number_of_replicas_);
21+
22+
std::optional<String> getNextTask(size_t number_of_current_replica);
23+
24+
private:
25+
size_t getReplicaForFile(const String & file_path);
26+
std::optional<String> getPreQueuedFile(size_t number_of_current_replica);
27+
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
28+
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);
29+
30+
std::shared_ptr<IObjectIterator> iterator;
31+
32+
std::vector<std::vector<String>> connection_to_files;
33+
std::unordered_set<String> unprocessed_files;
34+
35+
std::mutex mutex;
36+
bool iterator_exhausted = false;
37+
38+
LoggerPtr log = getLogger("StorageClusterTaskDistributor");
39+
};
40+
41+
}

src/Storages/StorageDistributed.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,9 +1248,6 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
12481248
if (filter)
12491249
predicate = filter->getOutputs().at(0);
12501250

1251-
/// Select query is needed for pruining on virtual columns
1252-
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context);
1253-
12541251
auto dst_cluster = getCluster();
12551252

12561253
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
@@ -1277,8 +1274,14 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
12771274
const auto & current_settings = query_context->getSettingsRef();
12781275
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
12791276

1280-
/// Here we take addresses from destination cluster and assume source table exists on these nodes
12811277
const auto cluster = getCluster();
1278+
1279+
/// Select query is needed for pruining on virtual columns
1280+
auto number_of_replicas = static_cast<UInt64>(cluster->getShardsInfo().size());
1281+
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas);
1282+
1283+
/// Here we take addresses from destination cluster and assume source table exists on these nodes
1284+
size_t replica_index = 0;
12821285
for (const auto & replicas : cluster->getShardsInfo())
12831286
{
12841287
/// Skip unavailable hosts if necessary
@@ -1287,6 +1290,8 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
12871290
/// There will be only one replica, because we consider each replica as a shard
12881291
for (const auto & try_result : try_results)
12891292
{
1293+
IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };
1294+
12901295
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
12911296
std::vector<IConnectionPool::Entry>{try_result},
12921297
new_query_str,
@@ -1297,7 +1302,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
12971302
Tables{},
12981303
QueryProcessingStage::Complete,
12991304
nullptr,
1300-
extension);
1305+
RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)});
13011306

13021307
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(
13031308
remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote]));

0 commit comments

Comments
 (0)