2626#include < Storages/IStorage.h>
2727#include < Storages/SelectQueryInfo.h>
2828#include < Storages/StorageDictionary.h>
29+ #include < Planner/Utils.h>
30+ #include < Analyzer/QueryTreeBuilder.h>
31+ #include < Analyzer/QueryNode.h>
32+ #include < Analyzer/ColumnNode.h>
33+ #include < Analyzer/InDepthQueryTreeVisitor.h>
2934#include < Storages/StorageDistributed.h>
3035#include < TableFunctions/TableFunctionFactory.h>
3136#include < Storages/extractTableFunctionFromSelectQuery.h>
@@ -47,12 +52,14 @@ namespace Setting
4752 extern const SettingsString cluster_for_parallel_replicas;
4853 extern const SettingsNonZeroUInt64 max_parallel_replicas;
4954 extern const SettingsUInt64 object_storage_max_nodes;
55+ extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
5056 extern const SettingsBool object_storage_remote_initiator;
5157}
5258
5359namespace ErrorCodes
5460{
5561 extern const int NOT_IMPLEMENTED;
62+ extern const int LOGICAL_ERROR;
5663}
5764
5865namespace ErrorCodes
@@ -89,6 +96,175 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
8996 extension = storage->getTaskIteratorExtension (predicate, filter_actions_dag, context, cluster);
9097}
9198
99+ namespace
100+ {
101+
102+ /*
103+ Helping class to find in query tree first node of required type
104+ */
105+ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext <SearcherVisitor>
106+ {
107+ public:
108+ using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
109+ using Base::Base;
110+
111+ explicit SearcherVisitor (QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
112+
113+ bool needChildVisit (QueryTreeNodePtr &, QueryTreeNodePtr & /* child*/ )
114+ {
115+ return !passed_node;
116+ }
117+
118+ void enterImpl (QueryTreeNodePtr & node)
119+ {
120+ if (passed_node)
121+ return ;
122+
123+ auto node_type = node->getNodeType ();
124+
125+ if (node_type == type)
126+ passed_node = node;
127+ }
128+
129+ QueryTreeNodePtr getNode () const { return passed_node; }
130+
131+ private:
132+ QueryTreeNodeType type;
133+ QueryTreeNodePtr passed_node;
134+ };
135+
136+ /*
137+ Helping class to find all used columns with specific source
138+ */
139+ class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext <CollectUsedColumnsForSourceVisitor>
140+ {
141+ public:
142+ using Base = InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor>;
143+ using Base::Base;
144+
145+ explicit CollectUsedColumnsForSourceVisitor (
146+ QueryTreeNodePtr source_,
147+ ContextPtr context,
148+ bool collect_columns_from_other_sources_ = false )
149+ : Base(context)
150+ , source(source_)
151+ , collect_columns_from_other_sources(collect_columns_from_other_sources_)
152+ {}
153+
154+ void enterImpl (QueryTreeNodePtr & node)
155+ {
156+ auto node_type = node->getNodeType ();
157+
158+ if (node_type != QueryTreeNodeType::COLUMN)
159+ return ;
160+
161+ auto & column_node = node->as <ColumnNode &>();
162+ auto column_source = column_node.getColumnSourceOrNull ();
163+ if (!column_source)
164+ return ;
165+
166+ if ((column_source == source) != collect_columns_from_other_sources)
167+ {
168+ const auto & name = column_node.getColumnName ();
169+ if (!names.count (name))
170+ {
171+ columns.emplace_back (column_node.getColumn ());
172+ names.insert (name);
173+ }
174+ }
175+ }
176+
177+ const NamesAndTypes & getColumns () const { return columns; }
178+
179+ private:
180+ std::unordered_set<std::string> names;
181+ QueryTreeNodePtr source;
182+ NamesAndTypes columns;
183+ bool collect_columns_from_other_sources;
184+ };
185+
186+ };
187+
188+ /*
189+ Try to make subquery to send on nodes
190+ Converts
191+
192+ SELECT s3.c1, s3.c2, t.c3
193+ FROM
194+ s3Cluster(...) AS s3
195+ JOIN
196+ localtable as t
197+ ON s3.key == t.key
198+
199+ to
200+
201+ SELECT s3.c1, s3.c2, s3.key
202+ FROM
203+ s3Cluster(...) AS s3
204+ */
205+ void IStorageCluster::updateQueryWithJoinToSendIfNeeded (
206+ ASTPtr & query_to_send,
207+ QueryTreeNodePtr query_tree,
208+ const ContextPtr & context)
209+ {
210+ auto object_storage_cluster_join_mode = context->getSettingsRef ()[Setting::object_storage_cluster_join_mode];
211+ switch (object_storage_cluster_join_mode)
212+ {
213+ case ObjectStorageClusterJoinMode::LOCAL:
214+ {
215+ auto modified_query_tree = query_tree->clone ();
216+ bool need_modify = false ;
217+
218+ SearcherVisitor table_function_searcher (QueryTreeNodeType::TABLE_FUNCTION, context);
219+ table_function_searcher.visit (query_tree);
220+ auto table_function_node = table_function_searcher.getNode ();
221+ if (!table_function_node)
222+ throw Exception (ErrorCodes::LOGICAL_ERROR, " Can't find table function node" );
223+
224+ if (has_join)
225+ {
226+ auto table_function = extractTableFunctionASTPtrFromSelectQuery (query_to_send);
227+ auto query_tree_distributed = buildTableFunctionQueryTree (table_function, context);
228+ auto & table_function_ast = table_function->as <ASTFunction &>();
229+ query_tree_distributed->setAlias (table_function_ast.alias );
230+
231+ // Find add used columns from table function to make proper projection list
232+ CollectUsedColumnsForSourceVisitor collector (table_function_node, context);
233+ collector.visit (query_tree);
234+ const auto & columns = collector.getColumns ();
235+
236+ auto & query_node = modified_query_tree->as <QueryNode &>();
237+ query_node.resolveProjectionColumns (columns);
238+ auto column_nodes_to_select = std::make_shared<ListNode>();
239+ column_nodes_to_select->getNodes ().reserve (columns.size ());
240+ for (auto & column : columns)
241+ column_nodes_to_select->getNodes ().emplace_back (std::make_shared<ColumnNode>(column, table_function_node));
242+ query_node.getProjectionNode () = column_nodes_to_select;
243+
244+ // Left only table function to send on cluster nodes
245+ modified_query_tree = modified_query_tree->cloneAndReplace (query_node.getJoinTree (), query_tree_distributed);
246+
247+ need_modify = true ;
248+ }
249+
250+ if (has_local_columns_in_where)
251+ {
252+ auto & query_node = modified_query_tree->as <QueryNode &>();
253+ query_node.getWhere () = {};
254+ }
255+
256+ if (need_modify)
257+ query_to_send = queryNodeToDistributedSelectQuery (modified_query_tree);
258+ return ;
259+ }
260+ case ObjectStorageClusterJoinMode::GLOBAL:
261+ // TODO
262+ throw Exception (ErrorCodes::NOT_IMPLEMENTED, " `Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now" );
263+ case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special
264+ return ;
265+ }
266+ }
267+
92268// / The code executes on initiator
93269void IStorageCluster::read (
94270 QueryPlan & query_plan,
@@ -119,13 +295,15 @@ void IStorageCluster::read(
119295 Block sample_block;
120296 ASTPtr query_to_send = query_info.query ;
121297
298+ updateQueryWithJoinToSendIfNeeded (query_to_send, query_info.query_tree , context);
299+
122300 if (settings[Setting::allow_experimental_analyzer])
123301 {
124- sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock (query_info. query , context, SelectQueryOptions (processed_stage));
302+ sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock (query_to_send , context, SelectQueryOptions (processed_stage));
125303 }
126304 else
127305 {
128- auto interpreter = InterpreterSelectQuery (query_info. query , context, SelectQueryOptions (processed_stage).analyze ());
306+ auto interpreter = InterpreterSelectQuery (query_to_send , context, SelectQueryOptions (processed_stage).analyze ());
129307 sample_block = interpreter.getSampleBlock ();
130308 query_to_send = interpreter.getQueryInfo ().query ->clone ();
131309 }
@@ -144,7 +322,7 @@ void IStorageCluster::read(
144322 }
145323
146324 RestoreQualifiedNamesVisitor::Data data;
147- data.distributed_table = DatabaseAndTableWithAlias (*getTableExpression (query_info. query ->as <ASTSelectQuery &>(), 0 ));
325+ data.distributed_table = DatabaseAndTableWithAlias (*getTableExpression (query_to_send ->as <ASTSelectQuery &>(), 0 ));
148326 data.remote_table .database = context->getCurrentDatabase ();
149327 data.remote_table .table = getName ();
150328 RestoreQualifiedNamesVisitor (data).visit (query_to_send);
@@ -309,8 +487,40 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
309487}
310488
311489QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage (
312- ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
490+ ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info ) const
313491{
492+ auto object_storage_cluster_join_mode = context->getSettingsRef ()[Setting::object_storage_cluster_join_mode];
493+
494+ if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
495+ {
496+ if (!context->getSettingsRef ()[Setting::allow_experimental_analyzer])
497+ throw Exception (ErrorCodes::NOT_IMPLEMENTED,
498+ " object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true" );
499+
500+ SearcherVisitor join_searcher (QueryTreeNodeType::JOIN, context);
501+ join_searcher.visit (query_info.query_tree );
502+ if (join_searcher.getNode ())
503+ has_join = true ;
504+
505+ SearcherVisitor table_function_searcher (QueryTreeNodeType::TABLE_FUNCTION, context);
506+ table_function_searcher.visit (query_info.query_tree );
507+ auto table_function_node = table_function_searcher.getNode ();
508+ if (!table_function_node)
509+ throw Exception (ErrorCodes::LOGICAL_ERROR, " Can't find table function node" );
510+
511+ CollectUsedColumnsForSourceVisitor collector_where (table_function_node, context, true );
512+ auto & query_node = query_info.query_tree ->as <QueryNode &>();
513+ if (query_node.hasWhere ())
514+ collector_where.visit (query_node.getWhere ());
515+
516+ // Can't use 'WHERE' on remote node if it contains columns from other sources
517+ if (!collector_where.getColumns ().empty ())
518+ has_local_columns_in_where = true ;
519+
520+ if (has_join || has_local_columns_in_where)
521+ return QueryProcessingStage::Enum::FetchColumns;
522+ }
523+
314524 // / Initiator executes query on remote node.
315525 if (context->getClientInfo ().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
316526 if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
0 commit comments