Skip to content

Commit 65afe6f

Browse files
Merge pull request ClickHouse#70659 from ClickHouse/auto-cluster-engines
Use cluster table functions automatically if parallel replicas are enabled
2 parents 48ede05 + 8d6ac55 commit 65afe6f

33 files changed

+378
-130
lines changed

src/Analyzer/Resolve/QueryAnalyzer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
#include <Analyzer/Resolve/QueryExpressionsAliasVisitor.h>
6767
#include <Analyzer/Resolve/ReplaceColumnsVisitor.h>
6868
#include <Analyzer/Resolve/TableExpressionsAliasVisitor.h>
69+
#include <Analyzer/Resolve/TableFunctionsWithClusterAlternativesVisitor.h>
6970
#include <Analyzer/Resolve/TypoCorrection.h>
7071

7172
#include <Planner/PlannerActionsVisitor.h>
@@ -5564,6 +5565,11 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
55645565
TableExpressionsAliasVisitor table_expressions_visitor(scope);
55655566
table_expressions_visitor.visit(query_node_typed.getJoinTree());
55665567

5568+
TableFunctionsWithClusterAlternativesVisitor table_function_visitor;
5569+
table_function_visitor.visit(query_node);
5570+
if (!table_function_visitor.shouldReplaceWithClusterAlternatives() && scope.context->hasQueryContext())
5571+
scope.context->getQueryContext()->setSetting("parallel_replicas_for_cluster_engines", false);
5572+
55675573
initializeQueryJoinTreeNode(query_node_typed.getJoinTree(), scope);
55685574
scope.aliases.alias_name_to_table_expression_node = std::move(transitive_aliases);
55695575

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#pragma once
2+
3+
#include <Analyzer/InDepthQueryTreeVisitor.h>
4+
#include <Analyzer/TableFunctionNode.h>
5+
#include <TableFunctions/TableFunctionFactory.h>
6+
#include <TableFunctions/TableFunctionFile.h>
7+
8+
namespace DB
9+
{
10+
11+
class TableFunctionsWithClusterAlternativesVisitor : public InDepthQueryTreeVisitor<TableFunctionsWithClusterAlternativesVisitor, /*const_visitor=*/true>
12+
{
13+
public:
14+
void visitImpl(const QueryTreeNodePtr & node)
15+
{
16+
if (node->getNodeType() == QueryTreeNodeType::TABLE_FUNCTION)
17+
++table_function_count;
18+
else if (node->getNodeType() == QueryTreeNodeType::TABLE)
19+
++table_count;
20+
}
21+
22+
bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr &) { return true; }
23+
bool shouldReplaceWithClusterAlternatives() const { return (table_count + table_function_count) == 1; }
24+
25+
private:
26+
size_t table_count = 0;
27+
size_t table_function_count = 0;
28+
};
29+
30+
}

src/Core/Settings.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5824,11 +5824,12 @@ Build local plan for local replica
58245824
DECLARE(Bool, parallel_replicas_index_analysis_only_on_coordinator, true, R"(
58255825
Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan
58265826
)", BETA) \
5827-
\
58285827
DECLARE(Bool, parallel_replicas_only_with_analyzer, true, R"(
58295828
The analyzer should be enabled to use parallel replicas. With disabled analyzer query execution fallbacks to local execution, even if parallel reading from replicas is enabled. Using parallel replicas without the analyzer enabled is not supported
58305829
)", BETA) \
5831-
\
5830+
DECLARE(Bool, parallel_replicas_for_cluster_engines, true, R"(
5831+
Replace table function engines with their -Cluster alternatives
5832+
)", EXPERIMENTAL) \
58325833
DECLARE(Bool, allow_experimental_analyzer, true, R"(
58335834
Allow new query analyzer.
58345835
)", IMPORTANT) ALIAS(enable_analyzer) \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7070
{
7171
{"use_page_cache_with_distributed_cache", false, false, "New setting"},
7272
{"use_query_condition_cache", false, false, "New setting."},
73+
{"parallel_replicas_for_cluster_engines", false, true, "New setting."},
7374
});
7475
addSettingsChanges(settings_changes_history, "25.2",
7576
{

src/Databases/DatabaseS3.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ StoragePtr DatabaseS3::getTableImpl(const String & name, ContextPtr context_) co
138138
return nullptr;
139139

140140
/// TableFunctionS3 throws exceptions, if table cannot be created.
141-
auto table_storage = table_function->execute(function, context_, name);
141+
auto table_storage = table_function->execute(function, context_, name, /*cached_columns_=*/{}, /*use_global_context=*/false, /*is_insert_query=*/true);
142142
if (table_storage)
143143
addTable(name, table_storage);
144144

src/Interpreters/InterpreterCreateQuery.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1924,7 +1924,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
19241924
/// In case of CREATE AS table_function() query we should use global context
19251925
/// in storage creation because there will be no query context on server startup
19261926
/// and because storage lifetime is bigger than query context lifetime.
1927-
res = table_function->execute(table_function_ast, getContext(), create.getTable(), properties.columns, /*use_global_context=*/true);
1927+
res = table_function->execute(table_function_ast, getContext(), create.getTable(), properties.columns, /*use_global_context=*/true, /*is_insert_query=*/true);
19281928
res->renameInMemory({create.getDatabase(), create.getTable(), create.uuid});
19291929
}
19301930
else

src/Storages/IStorageCluster.cpp

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
#include <Storages/SelectQueryInfo.h>
2525
#include <Storages/StorageDictionary.h>
2626

27+
#include <algorithm>
2728
#include <memory>
2829
#include <string>
2930

31+
3032
namespace DB
3133
{
3234
namespace Setting
@@ -35,6 +37,9 @@ namespace Setting
3537
extern const SettingsBool async_query_sending_for_remote;
3638
extern const SettingsBool async_socket_for_remote;
3739
extern const SettingsBool skip_unavailable_shards;
40+
extern const SettingsBool parallel_replicas_local_plan;
41+
extern const SettingsString cluster_for_parallel_replicas;
42+
extern const SettingsNonZeroUInt64 max_parallel_replicas;
3843
}
3944

4045
IStorageCluster::IStorageCluster(
@@ -183,29 +188,45 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
183188
auto new_context = updateSettings(context->getSettingsRef());
184189
const auto & current_settings = new_context->getSettingsRef();
185190
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
191+
192+
auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
193+
if (current_settings[Setting::max_parallel_replicas] > 1)
194+
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);
195+
186196
for (const auto & shard_info : cluster->getShardsInfo())
187197
{
188-
auto try_results = shard_info.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY);
189-
for (auto & try_result : try_results)
190-
{
191-
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
192-
std::vector<IConnectionPool::Entry>{try_result},
193-
queryToString(query_to_send),
194-
getOutputHeader(),
195-
new_context,
196-
/*throttler=*/nullptr,
197-
scalars,
198-
Tables(),
199-
processed_stage,
200-
extension);
201-
202-
remote_query_executor->setLogger(log);
203-
pipes.emplace_back(std::make_shared<RemoteSource>(
204-
remote_query_executor,
205-
add_agg_info,
206-
current_settings[Setting::async_socket_for_remote],
207-
current_settings[Setting::async_query_sending_for_remote]));
208-
}
198+
if (pipes.size() >= max_replicas_to_use)
199+
break;
200+
201+
/// We're taking all replicas as shards,
202+
/// so each shard will have only one address to connect to.
203+
auto try_results = shard_info.pool->getMany(
204+
timeouts,
205+
current_settings,
206+
PoolMode::GET_ONE,
207+
{},
208+
/*skip_unavailable_endpoints=*/true);
209+
210+
if (try_results.empty())
211+
continue;
212+
213+
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
214+
std::vector<IConnectionPool::Entry>{try_results.front()},
215+
queryToString(query_to_send),
216+
getOutputHeader(),
217+
new_context,
218+
/*throttler=*/nullptr,
219+
scalars,
220+
Tables(),
221+
processed_stage,
222+
extension);
223+
224+
remote_query_executor->setLogger(log);
225+
pipes.emplace_back(std::make_shared<RemoteSource>(
226+
remote_query_executor,
227+
add_agg_info,
228+
current_settings[Setting::async_socket_for_remote],
229+
current_settings[Setting::async_query_sending_for_remote]));
209230
}
210231

211232
auto pipe = Pipe::unitePipes(std::move(pipes));

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "Storages/ObjectStorage/StorageObjectStorageCluster.h"
22

33
#include <Common/Exception.h>
4+
#include <Common/StringUtils.h>
5+
46
#include <Core/Settings.h>
57
#include <Formats/FormatFactory.h>
68
#include <Parsers/queryToString.h>
@@ -10,7 +12,7 @@
1012
#include <Storages/VirtualColumnUtils.h>
1113
#include <Storages/ObjectStorage/Utils.h>
1214
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
13-
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
15+
#include <Storages/extractTableFunctionFromSelectQuery.h>
1416

1517

1618
namespace DB
@@ -86,7 +88,16 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
8688
const DB::StorageSnapshotPtr & storage_snapshot,
8789
const ContextPtr & context)
8890
{
89-
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
91+
auto * table_function = extractTableFunctionFromSelectQuery(query);
92+
if (!table_function)
93+
{
94+
throw Exception(
95+
ErrorCodes::LOGICAL_ERROR,
96+
"Expected SELECT query from table function {}, got '{}'",
97+
configuration->getEngineName(), queryToString(query));
98+
}
99+
100+
auto * expression_list = table_function->arguments->as<ASTExpressionList>();
90101
if (!expression_list)
91102
{
92103
throw Exception(
@@ -105,25 +116,35 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
105116
configuration->getEngineName());
106117
}
107118

108-
ASTPtr cluster_name_arg = args.front();
109-
args.erase(args.begin());
110-
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
111-
args.insert(args.begin(), cluster_name_arg);
119+
if (!endsWith(table_function->name, "Cluster"))
120+
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
121+
else
122+
{
123+
ASTPtr cluster_name_arg = args.front();
124+
args.erase(args.begin());
125+
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context, /*with_structure=*/true);
126+
args.insert(args.begin(), cluster_name_arg);
127+
}
112128
}
113129

114130
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
115131
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
116132
{
117133
auto iterator = StorageObjectStorageSource::createFileIterator(
118134
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
119-
local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback());
135+
local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true);
120136

121137
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
122138
{
123139
auto object_info = iterator->next(0);
124-
if (object_info)
125-
return object_info->getPath();
126-
return "";
140+
if (!object_info)
141+
return "";
142+
143+
auto archive_object_info = std::dynamic_pointer_cast<StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive>(object_info);
144+
if (archive_object_info)
145+
return archive_object_info->getPathToArchive();
146+
147+
return object_info->getPath();
127148
});
128149
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
129150
}

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -129,17 +129,25 @@ std::shared_ptr<IObjectIterator> StorageObjectStorageSource::createFileIterator(
129129
const ActionsDAG::Node * predicate,
130130
const NamesAndTypesList & virtual_columns,
131131
ObjectInfos * read_keys,
132-
std::function<void(FileProgress)> file_progress_callback)
132+
std::function<void(FileProgress)> file_progress_callback,
133+
bool ignore_archive_globs)
133134
{
135+
const bool is_archive = configuration->isArchive();
136+
134137
if (distributed_processing)
135-
return std::make_shared<ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads]);
138+
{
139+
auto distributed_iterator = std::make_unique<ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads]);
140+
141+
if (is_archive)
142+
return std::make_shared<ArchiveIterator>(object_storage, configuration, std::move(distributed_iterator), local_context, nullptr);
143+
144+
return distributed_iterator;
145+
}
136146

137147
if (configuration->isNamespaceWithGlobs())
138148
throw Exception(ErrorCodes::BAD_ARGUMENTS,
139149
"Expression can not have wildcards inside {} name", configuration->getNamespaceType());
140150

141-
const bool is_archive = configuration->isArchive();
142-
143151
std::unique_ptr<IObjectIterator> iterator;
144152
if (configuration->isPathWithGlobs())
145153
{
@@ -190,7 +198,7 @@ std::shared_ptr<IObjectIterator> StorageObjectStorageSource::createFileIterator(
190198

191199
if (is_archive)
192200
{
193-
return std::make_shared<ArchiveIterator>(object_storage, configuration, std::move(iterator), local_context, read_keys);
201+
return std::make_shared<ArchiveIterator>(object_storage, configuration, std::move(iterator), local_context, read_keys, ignore_archive_globs);
194202
}
195203

196204
return iterator;
@@ -539,9 +547,8 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
539547
if (!object_info.metadata)
540548
{
541549
if (!use_cache)
542-
{
543550
return object_storage->readObject(StoredObject(object_info.getPath()), read_settings);
544-
}
551+
545552
object_info.metadata = object_storage->getObjectMetadata(object_info.getPath());
546553
}
547554

@@ -646,6 +653,7 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
646653
impl->setReadUntilEnd();
647654
impl->prefetch(DEFAULT_PREFETCH_PRIORITY);
648655
}
656+
649657
return impl;
650658
}
651659

@@ -828,7 +836,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::KeysIterator::ne
828836
{
829837
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
830838
if (current_index >= keys.size())
831-
return {};
839+
return nullptr;
832840

833841
auto key = keys[current_index];
834842

@@ -898,16 +906,24 @@ StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
898906
for (auto & key_future : keys)
899907
{
900908
auto key = key_future.get();
901-
if (!key.empty())
902-
buffer.emplace_back(std::make_shared<ObjectInfo>(key, std::nullopt));
909+
if (key.empty())
910+
continue;
911+
912+
buffer.emplace_back(std::make_shared<ObjectInfo>(key, std::nullopt));
903913
}
904914
}
905915

906916
StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator::next(size_t)
907917
{
908918
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
909919
if (current_index >= buffer.size())
910-
return std::make_shared<ObjectInfo>(callback());
920+
{
921+
auto key = callback();
922+
if (key.empty())
923+
return nullptr;
924+
925+
return std::make_shared<ObjectInfo>(key, std::nullopt);
926+
}
911927

912928
return buffer[current_index];
913929
}
@@ -938,7 +954,8 @@ StorageObjectStorageSource::ArchiveIterator::ArchiveIterator(
938954
ConfigurationPtr configuration_,
939955
std::unique_ptr<IObjectIterator> archives_iterator_,
940956
ContextPtr context_,
941-
ObjectInfos * read_keys_)
957+
ObjectInfos * read_keys_,
958+
bool ignore_archive_globs_)
942959
: WithContext(context_)
943960
, object_storage(object_storage_)
944961
, is_path_in_archive_with_globs(configuration_->isPathInArchiveWithGlobs())
@@ -947,6 +964,7 @@ StorageObjectStorageSource::ArchiveIterator::ArchiveIterator(
947964
, log(getLogger("ArchiveIterator"))
948965
, path_in_archive(is_path_in_archive_with_globs ? "" : configuration_->getPathInArchive())
949966
, read_keys(read_keys_)
967+
, ignore_archive_globs(ignore_archive_globs_)
950968
{
951969
}
952970

@@ -977,12 +995,15 @@ ObjectInfoPtr StorageObjectStorageSource::ArchiveIterator::next(size_t processor
977995
if (!archive_object)
978996
return {};
979997

998+
if (!archive_object->metadata)
999+
archive_object->metadata = object_storage->getObjectMetadata(archive_object->getPath());
1000+
9801001
archive_reader = createArchiveReader(archive_object);
9811002
file_enumerator = archive_reader->firstFile();
9821003
if (!file_enumerator)
9831004
continue;
9841005
}
985-
else if (!file_enumerator->nextFile())
1006+
else if (!file_enumerator->nextFile() || ignore_archive_globs)
9861007
{
9871008
file_enumerator.reset();
9881009
continue;

0 commit comments

Comments
 (0)