-
Notifications
You must be signed in to change notification settings - Fork 17
Cluster Joins part 2 - global mode #1527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: antalya-26.1
Are you sure you want to change the base?
Changes from 4 commits
4a70d3b
972f1ee
b6bf151
38e89f4
e15af07
7649388
cb5e474
0cd90a8
904d00e
d0de2cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,13 +13,15 @@ | |
| #include <Interpreters/TranslateQualifiedNamesVisitor.h> | ||
| #include <Interpreters/InterpreterSelectQueryAnalyzer.h> | ||
| #include <Processors/Sources/RemoteSource.h> | ||
| #include <Processors/Executors/PullingPipelineExecutor.h> | ||
| #include <QueryPipeline/narrowPipe.h> | ||
| #include <QueryPipeline/Pipe.h> | ||
| #include <QueryPipeline/RemoteQueryExecutor.h> | ||
| #include <QueryPipeline/QueryPipelineBuilder.h> | ||
| #include <Storages/IStorage.h> | ||
| #include <Storages/SelectQueryInfo.h> | ||
| #include <Storages/extractTableFunctionFromSelectQuery.h> | ||
| #include <Storages/buildQueryTreeForShard.h> | ||
| #include <Planner/Utils.h> | ||
| #include <Analyzer/QueryTreeBuilder.h> | ||
| #include <Analyzer/QueryNode.h> | ||
|
|
@@ -104,11 +106,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito | |
| using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>; | ||
| using Base::Base; | ||
|
|
||
| explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {} | ||
| explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, size_t entry_, ContextPtr context) | ||
| : Base(context) | ||
| , types(types_) | ||
| , entry(entry_) {} | ||
|
|
||
| bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/) | ||
| { | ||
| return getSubqueryDepth() <= 2 && !passed_node; | ||
| return getSubqueryDepth() <= 2 && !passed_node && !current_entry; | ||
| } | ||
|
|
||
| void enterImpl(QueryTreeNodePtr & node) | ||
|
|
@@ -119,13 +124,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito | |
| auto node_type = node->getNodeType(); | ||
|
|
||
| if (types.contains(node_type)) | ||
| passed_node = node; | ||
| { | ||
| ++current_entry; | ||
| if (current_entry == entry) | ||
| passed_node = node; | ||
| } | ||
| } | ||
|
|
||
| QueryTreeNodePtr getNode() const { return passed_node; } | ||
|
|
||
| private: | ||
| std::unordered_set<QueryTreeNodeType> types; | ||
| size_t entry; | ||
| size_t current_entry = 0; | ||
| QueryTreeNodePtr passed_node; | ||
| }; | ||
|
|
||
|
|
@@ -192,33 +203,42 @@ Converts | |
| localtable as t | ||
| ON s3.key == t.key | ||
|
|
||
| to | ||
| to (object_storage_cluster_join_mode='local') | ||
|
|
||
| SELECT s3.c1, s3.c2, s3.key | ||
| FROM | ||
| s3Cluster(...) AS s3 | ||
|
|
||
| or (object_storage_cluster_join_mode='global') | ||
|
|
||
| SELECT s3.c1, s3.c2, t.c3 | ||
| FROM | ||
| s3Cluster(...) as s3 | ||
| JOIN | ||
| values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t | ||
| ON s3.key == t.key | ||
| */ | ||
| void IStorageCluster::updateQueryWithJoinToSendIfNeeded( | ||
| ASTPtr & query_to_send, | ||
| QueryTreeNodePtr query_tree, | ||
| SelectQueryInfo query_info, | ||
| const ContextPtr & context) | ||
| { | ||
| auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode]; | ||
| switch (object_storage_cluster_join_mode) | ||
| { | ||
| case ObjectStorageClusterJoinMode::LOCAL: | ||
| { | ||
| auto info = getQueryTreeInfo(query_tree, context); | ||
| auto info = getQueryTreeInfo(query_info.query_tree, context); | ||
|
|
||
| if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) | ||
| { | ||
| auto modified_query_tree = query_tree->clone(); | ||
| auto modified_query_tree = query_info.query_tree->clone(); | ||
|
|
||
| SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); | ||
| SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); | ||
| left_table_expression_searcher.visit(modified_query_tree); | ||
| auto table_function_node = left_table_expression_searcher.getNode(); | ||
| if (!table_function_node) | ||
| throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node"); | ||
| throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node"); | ||
|
|
||
| QueryTreeNodePtr query_tree_distributed; | ||
|
|
||
|
|
@@ -231,7 +251,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( | |
| } | ||
| else if (info.has_cross_join) | ||
| { | ||
| SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context); | ||
| SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context); | ||
| join_searcher.visit(modified_query_tree); | ||
| auto cross_join_node = join_searcher.getNode(); | ||
| if (!cross_join_node) | ||
|
|
@@ -286,8 +306,25 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded( | |
| return; | ||
| } | ||
| case ObjectStorageClusterJoinMode::GLOBAL: | ||
| // TODO | ||
| throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now"); | ||
| { | ||
| auto info = getQueryTreeInfo(query_info.query_tree, context); | ||
|
|
||
| if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) | ||
| { | ||
| auto modified_query_tree = query_info.query_tree->clone(); | ||
|
|
||
| rewriteJoinToGlobalJoin(modified_query_tree, context); | ||
| modified_query_tree = buildQueryTreeForShard( | ||
| query_info.planner_context, | ||
| modified_query_tree, | ||
| /*allow_global_join_for_right_table*/ true, | ||
| /*find_cross_join*/ true); | ||
| query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree); | ||
| send_external_tables = true; | ||
| } | ||
|
|
||
| return; | ||
| } | ||
| case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special | ||
| return; | ||
| } | ||
|
|
@@ -316,7 +353,7 @@ void IStorageCluster::read( | |
| SharedHeader sample_block; | ||
| ASTPtr query_to_send = query_info.query; | ||
|
|
||
| updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context); | ||
| updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context); | ||
|
|
||
| if (context->getSettingsRef()[Setting::allow_experimental_analyzer]) | ||
| { | ||
|
|
@@ -343,6 +380,10 @@ void IStorageCluster::read( | |
|
|
||
| auto this_ptr = std::static_pointer_cast<IStorageCluster>(shared_from_this()); | ||
|
|
||
| std::optional<Tables> external_tables = std::nullopt; | ||
| if (send_external_tables) | ||
| external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables(); | ||
|
||
|
|
||
| auto reading = std::make_unique<ReadFromCluster>( | ||
| column_names, | ||
| query_info, | ||
|
|
@@ -353,7 +394,8 @@ void IStorageCluster::read( | |
| std::move(query_to_send), | ||
| processed_stage, | ||
| cluster, | ||
| log); | ||
| log, | ||
| external_tables); | ||
|
|
||
| query_plan.addStep(std::move(reading)); | ||
| } | ||
|
|
@@ -401,7 +443,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const | |
| new_context, | ||
| /*throttler=*/nullptr, | ||
| scalars, | ||
| Tables(), | ||
| external_tables.has_value() ? *external_tables : Tables(), | ||
| processed_stage, | ||
| nullptr, | ||
| RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)}); | ||
|
|
@@ -439,7 +481,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt | |
| info.has_cross_join = true; | ||
| } | ||
|
|
||
| SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context); | ||
| SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context); | ||
| left_table_expression_searcher.visit(query_tree); | ||
| auto table_function_node = left_table_expression_searcher.getNode(); | ||
| if (!table_function_node) | ||
|
|
@@ -472,11 +514,14 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( | |
| { | ||
| if (!context->getSettingsRef()[Setting::allow_experimental_analyzer]) | ||
| throw Exception(ErrorCodes::NOT_IMPLEMENTED, | ||
| "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true"); | ||
| "object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=false"); | ||
|
||
|
|
||
| auto info = getQueryTreeInfo(query_info.query_tree, context); | ||
| if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) | ||
| return QueryProcessingStage::Enum::FetchColumns; | ||
| if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL) | ||
| { | ||
| auto info = getQueryTreeInfo(query_info.query_tree, context); | ||
| if (info.has_join || info.has_cross_join || info.has_local_columns_in_where) | ||
| return QueryProcessingStage::Enum::FetchColumns; | ||
| } | ||
| } | ||
|
|
||
| /// Initiator executes query on remote node. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,13 +55,14 @@ class IStorageCluster : public IStorage | |
| protected: | ||
| virtual void updateBeforeRead(const ContextPtr &) {} | ||
| virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {} | ||
| void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context); | ||
| void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context); | ||
|
|
||
| virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {} | ||
|
|
||
| private: | ||
| LoggerPtr log; | ||
| String cluster_name; | ||
| bool send_external_tables = false; | ||
|
||
|
|
||
| struct QueryTreeInfo | ||
| { | ||
|
|
@@ -91,7 +92,8 @@ class ReadFromCluster : public SourceStepWithFilter | |
| ASTPtr query_to_send_, | ||
| QueryProcessingStage::Enum processed_stage_, | ||
| ClusterPtr cluster_, | ||
| LoggerPtr log_) | ||
| LoggerPtr log_, | ||
| std::optional<Tables> external_tables_) | ||
| : SourceStepWithFilter( | ||
| std::move(sample_block), | ||
| column_names_, | ||
|
|
@@ -103,6 +105,7 @@ class ReadFromCluster : public SourceStepWithFilter | |
| , processed_stage(processed_stage_) | ||
| , cluster(std::move(cluster_)) | ||
| , log(log_) | ||
| , external_tables(external_tables_) | ||
| { | ||
| } | ||
|
|
||
|
|
@@ -114,6 +117,7 @@ class ReadFromCluster : public SourceStepWithFilter | |
| LoggerPtr log; | ||
|
|
||
| std::optional<RemoteQueryExecutor::Extension> extension; | ||
| std::optional<Tables> external_tables; | ||
|
|
||
| void createExtension(const ActionsDAG::Node * predicate); | ||
| ContextPtr updateSettings(const Settings & settings); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ namespace Setting | |
| extern const SettingsBool prefer_global_in_and_join; | ||
| extern const SettingsBool enable_add_distinct_to_in_subqueries; | ||
| extern const SettingsInt64 optimize_const_name_size; | ||
| extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode; | ||
| } | ||
|
|
||
| namespace ErrorCodes | ||
|
|
@@ -120,8 +121,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito | |
| using Base = InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>; | ||
| using Base::Base; | ||
|
|
||
| explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_) | ||
| explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_, bool find_cross_join_) | ||
| : Base(context_) | ||
| , find_cross_join(find_cross_join_) | ||
| {} | ||
|
|
||
| struct InFunctionOrJoin | ||
|
|
@@ -157,9 +159,11 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito | |
| { | ||
| auto * function_node = node->as<FunctionNode>(); | ||
| auto * join_node = node->as<JoinNode>(); | ||
| CrossJoinNode * cross_join_node = find_cross_join ? node->as<CrossJoinNode>() : nullptr; | ||
|
|
||
| if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) || | ||
| (join_node && join_node->getLocality() == JoinLocality::Global)) | ||
| (join_node && join_node->getLocality() == JoinLocality::Global) || | ||
| cross_join_node) | ||
| { | ||
| InFunctionOrJoin in_function_or_join_entry; | ||
| in_function_or_join_entry.query_node = node; | ||
|
|
@@ -223,7 +227,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito | |
| replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers); | ||
| replacement_map.emplace(table_node.get(), std::move(replacement_table_expression)); | ||
| } | ||
| else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings()[Setting::prefer_global_in_and_join]) && | ||
| else if ((distributed_product_mode == DistributedProductMode::GLOBAL || | ||
| getSettings()[Setting::prefer_global_in_and_join] || | ||
| (find_cross_join && getSettings()[Setting::object_storage_cluster_join_mode] == ObjectStorageClusterJoinMode::GLOBAL)) && | ||
| !in_function_or_join_stack.empty()) | ||
| { | ||
| auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get(); | ||
|
|
@@ -257,6 +263,8 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito | |
| std::vector<InFunctionOrJoin> in_function_or_join_stack; | ||
| std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map; | ||
| std::vector<InFunctionOrJoin> global_in_or_join_nodes; | ||
|
|
||
| bool find_cross_join = false; | ||
| }; | ||
|
|
||
| /** Replaces large constant values with `__getScalar` function calls to avoid | ||
|
|
@@ -504,14 +512,18 @@ QueryTreeNodePtr getSubqueryFromTableExpression( | |
|
|
||
| } | ||
|
|
||
| QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table) | ||
| QueryTreeNodePtr buildQueryTreeForShard( | ||
| const PlannerContextPtr & planner_context, | ||
| QueryTreeNodePtr query_tree_to_modify, | ||
| bool allow_global_join_for_right_table, | ||
| bool find_cross_join) | ||
| { | ||
| CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor; | ||
| collect_column_source_to_columns_visitor.visit(query_tree_to_modify); | ||
|
|
||
| const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns(); | ||
|
|
||
| DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext()); | ||
| DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext(), find_cross_join); | ||
| visitor.visit(query_tree_to_modify); | ||
|
|
||
| auto replacement_map = visitor.getReplacementMap(); | ||
|
|
@@ -550,6 +562,24 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex | |
| replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); | ||
| continue; | ||
| } | ||
| if (auto * cross_join_node = global_in_or_join_node.query_node->as<CrossJoinNode>()) | ||
| { | ||
| auto tables_count = cross_join_node->getTableExpressions().size(); | ||
| for (size_t i = 1; i < tables_count; ++i) | ||
| { | ||
| QueryTreeNodePtr join_table_expression = cross_join_node->getTableExpressions()[i]; | ||
|
|
||
| auto subquery_node = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext()); | ||
|
|
||
| auto temporary_table_expression_node = executeSubqueryNode(subquery_node, | ||
| planner_context->getMutableQueryContext(), | ||
| global_in_or_join_node.subquery_depth); | ||
| temporary_table_expression_node->setAlias(join_table_expression->getAlias()); | ||
|
|
||
| replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node)); | ||
| } | ||
| continue; | ||
| } | ||
| if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>()) | ||
| { | ||
| auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1); | ||
|
|
@@ -661,7 +691,8 @@ class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext | |
| { | ||
| if (auto * join_node = node->as<JoinNode>()) | ||
| { | ||
| bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]; | ||
| bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join] | ||
| && getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL; | ||
|
Comment on lines
+694
to
+695
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, |
||
| bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression()); | ||
| if (should_use_global_join) | ||
| join_node->setLocality(JoinLocality::Global); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting
send_external_tableshere stores query-specific state on the sharedIStorageClusterobject, but that flag is later read/cleared inread(). For storage engines that reuse one instance across requests (for exampleStorageObjectStorageCluster), concurrent reads can interleave so one query clears the flag while another still needs it, causing the second query to skip sending generated_data_*external tables to shards and fail at execution time. This state needs to be local to a single read path (or otherwise synchronized) rather than kept as mutable object state.Useful? React with 👍 / 👎.