Skip to content

Commit 7799454

Browse files
authored
Merge pull request #1082 from Altinity/feature/antalya-25.8/s3cluster_global_join_fixes
25.8 Antalya: Fix joins with Iceberg tables
2 parents 8fd1ba5 + 2513c29 commit 7799454

File tree

13 files changed

+434
-58
lines changed

13 files changed

+434
-58
lines changed

src/Core/Settings.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1763,12 +1763,12 @@ Possible values:
17631763
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
17641764
Changes the behaviour of object storage cluster function or table.
17651765
1766-
ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table.
1766+
ClickHouse applies this setting when the query contains the product of object storage cluster function or table, i.e. when the query for a object storage cluster function or table contains a non-GLOBAL subquery for the object storage cluster function or table.
17671767
17681768
Restrictions:
17691769
17701770
- Only applied for JOIN subqueries.
1771-
- Only if the FROM section uses a object storage cluster function ot table.
1771+
- Only if the FROM section uses a object storage cluster function or table.
17721772
17731773
Possible values:
17741774

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4646
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
4747
{"allow_retries_in_cluster_requests", false, false, "New setting"},
4848
{"object_storage_remote_initiator", false, false, "New setting."},
49+
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
50+
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
4951
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
5052
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}
5153
});

src/Core/SettingsEnums.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ enum class DistributedProductMode : uint8_t
163163

164164
DECLARE_SETTING_ENUM(DistributedProductMode)
165165

166-
/// The setting for executing object storage cluster function ot table JOIN sections.
166+
/// The setting for executing object storage cluster function or table JOIN sections.
167167
enum class ObjectStorageClusterJoinMode : uint8_t
168168
{
169169
LOCAL, /// Convert to local query

src/Planner/PlannerJoinTree.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1370,7 +1370,7 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
13701370
/// Overall, IStorage::read -> FetchColumns returns normal column names (except Distributed, which is inconsistent)
13711371
/// Interpreter::getQueryPlan -> FetchColumns returns identifiers (why?) and this the reason for the bug ^ in Distributed
13721372
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
1373-
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
1373+
if (table_node && table_node->getStorage()->isRemote())
13741374
updated_actions_dag_outputs.push_back(output_node);
13751375
else if (table_function_node && table_function_node->getStorage()->isRemote())
13761376
updated_actions_dag_outputs.push_back(output_node);

src/Storages/IStorageCluster.cpp

Lines changed: 99 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
#include <Analyzer/QueryTreeBuilder.h>
3232
#include <Analyzer/QueryNode.h>
3333
#include <Analyzer/ColumnNode.h>
34+
#include <Analyzer/JoinNode.h>
3435
#include <Analyzer/InDepthQueryTreeVisitor.h>
36+
#include <Analyzer/Utils.h>
3537
#include <Storages/StorageDistributed.h>
3638
#include <TableFunctions/TableFunctionFactory.h>
3739

@@ -112,7 +114,7 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
112114
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
113115
using Base::Base;
114116

115-
explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
117+
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}
116118

117119
bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/)
118120
{
@@ -126,15 +128,20 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
126128

127129
auto node_type = node->getNodeType();
128130

129-
if (node_type == type)
131+
if (types.contains(node_type))
132+
{
130133
passed_node = node;
134+
passed_type = node_type;
135+
}
131136
}
132137

133138
QueryTreeNodePtr getNode() const { return passed_node; }
139+
std::optional<QueryTreeNodeType> getType() const { return passed_type; }
134140

135141
private:
136-
QueryTreeNodeType type;
142+
std::unordered_set<QueryTreeNodeType> types;
137143
QueryTreeNodePtr passed_node;
144+
std::optional<QueryTreeNodeType> passed_type;
138145
};
139146

140147
/*
@@ -216,49 +223,71 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
216223
{
217224
case ObjectStorageClusterJoinMode::LOCAL:
218225
{
219-
auto modified_query_tree = query_tree->clone();
220-
bool need_modify = false;
221-
222-
SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
223-
table_function_searcher.visit(query_tree);
224-
auto table_function_node = table_function_searcher.getNode();
225-
if (!table_function_node)
226-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
226+
auto info = getQueryTreeInfo(query_tree, context);
227227

228-
if (has_join)
228+
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
229229
{
230-
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
231-
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
232-
auto & table_function_ast = table_function->as<ASTFunction &>();
233-
query_tree_distributed->setAlias(table_function_ast.alias);
230+
auto modified_query_tree = query_tree->clone();
231+
232+
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
233+
left_table_expression_searcher.visit(modified_query_tree);
234+
auto table_function_node = left_table_expression_searcher.getNode();
235+
if (!table_function_node)
236+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
237+
238+
QueryTreeNodePtr query_tree_distributed;
239+
240+
auto & query_node = modified_query_tree->as<QueryNode &>();
241+
242+
if (info.has_join)
243+
{
244+
auto join_node = query_node.getJoinTree();
245+
query_tree_distributed = join_node->as<JoinNode>()->getLeftTableExpression()->clone();
246+
}
247+
else if (info.has_cross_join)
248+
{
249+
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context);
250+
join_searcher.visit(modified_query_tree);
251+
auto cross_join_node = join_searcher.getNode();
252+
if (!cross_join_node)
253+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find CROSS JOIN node");
254+
// CrossJoinNode contains vector of nodes. 0 is left expression, always exists.
255+
query_tree_distributed = cross_join_node->as<CrossJoinNode>()->getTableExpressions()[0]->clone();
256+
}
234257

235258
// Find add used columns from table function to make proper projection list
259+
// Need to do before changing WHERE condition
236260
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
237-
collector.visit(query_tree);
261+
collector.visit(modified_query_tree);
238262
const auto & columns = collector.getColumns();
239263

240-
auto & query_node = modified_query_tree->as<QueryNode &>();
241264
query_node.resolveProjectionColumns(columns);
242265
auto column_nodes_to_select = std::make_shared<ListNode>();
243266
column_nodes_to_select->getNodes().reserve(columns.size());
244267
for (auto & column : columns)
245268
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
246269
query_node.getProjectionNode() = column_nodes_to_select;
247270

248-
// Left only table function to send on cluster nodes
249-
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
271+
if (info.has_local_columns_in_where)
272+
{
273+
if (query_node.getPrewhere())
274+
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context);
275+
if (query_node.getWhere())
276+
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context);
277+
}
250278

251-
need_modify = true;
252-
}
279+
query_node.getOrderByNode() = std::make_shared<ListNode>();
280+
query_node.getGroupByNode() = std::make_shared<ListNode>();
253281

254-
if (has_local_columns_in_where)
255-
{
256-
auto & query_node = modified_query_tree->as<QueryNode &>();
257-
query_node.getWhere() = {};
258-
}
282+
if (query_tree_distributed)
283+
{
284+
// Left only table function to send on cluster nodes
285+
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
286+
}
259287

260-
if (need_modify)
261288
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
289+
}
290+
262291
return;
263292
}
264293
case ObjectStorageClusterJoinMode::GLOBAL:
@@ -492,38 +521,59 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
492521
pipeline.init(std::move(pipe));
493522
}
494523

495-
QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
496-
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
524+
IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context)
497525
{
498-
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
526+
QueryTreeInfo info;
499527

500-
if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
501-
{
502-
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
503-
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
504-
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
528+
SearcherVisitor join_searcher({QueryTreeNodeType::JOIN, QueryTreeNodeType::CROSS_JOIN}, context);
529+
join_searcher.visit(query_tree);
505530

506-
SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context);
507-
join_searcher.visit(query_info.query_tree);
508-
if (join_searcher.getNode())
509-
has_join = true;
531+
if (join_searcher.getNode())
532+
{
533+
if (join_searcher.getType() == QueryTreeNodeType::JOIN)
534+
info.has_join = true;
535+
else
536+
info.has_cross_join = true;
537+
}
510538

511-
SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
512-
table_function_searcher.visit(query_info.query_tree);
513-
auto table_function_node = table_function_searcher.getNode();
514-
if (!table_function_node)
515-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
539+
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
540+
left_table_expression_searcher.visit(query_tree);
541+
auto table_function_node = left_table_expression_searcher.getNode();
542+
if (!table_function_node)
543+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node");
516544

545+
auto & query_node = query_tree->as<QueryNode &>();
546+
if (query_node.hasWhere() || query_node.hasPrewhere())
547+
{
517548
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
518-
auto & query_node = query_info.query_tree->as<QueryNode &>();
549+
if (query_node.hasPrewhere())
550+
collector_where.visit(query_node.getPrewhere());
519551
if (query_node.hasWhere())
520552
collector_where.visit(query_node.getWhere());
521553

522-
// Can't use 'WHERE' on remote node if it contains columns from other sources
554+
// SELECT x FROM datalake.table WHERE x IN local.table.
555+
// Need to modify 'WHERE' on remote node if it contains columns from other sources
556+
// because remote node might not have those sources.
523557
if (!collector_where.getColumns().empty())
524-
has_local_columns_in_where = true;
558+
info.has_local_columns_in_where = true;
559+
}
560+
561+
return info;
562+
}
563+
564+
QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
565+
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
566+
{
567+
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
568+
569+
if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
570+
{
571+
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
572+
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
573+
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
525574

526-
if (has_join || has_local_columns_in_where)
575+
auto info = getQueryTreeInfo(query_info.query_tree, context);
576+
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
527577
return QueryProcessingStage::Enum::FetchColumns;
528578
}
529579

src/Storages/IStorageCluster.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,14 @@ class IStorageCluster : public IStorage
105105
LoggerPtr log;
106106
String cluster_name;
107107

108-
mutable bool has_join = false;
109-
mutable bool has_local_columns_in_where = false;
108+
struct QueryTreeInfo
109+
{
110+
bool has_join = false;
111+
bool has_cross_join = false;
112+
bool has_local_columns_in_where = false;
113+
};
114+
115+
static QueryTreeInfo getQueryTreeInfo(QueryTreeNodePtr query_tree, ContextPtr context);
110116
};
111117

112118

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ Chunk StorageObjectStorageSource::generate()
364364
{
365365
chunk.addColumn(constant_column.first,
366366
constant_column.second.name_and_type.type->createColumnConst(
367-
chunk.getNumRows(), constant_column.second.value));
367+
chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst());
368368
}
369369

370370
#if USE_PARQUET && USE_AWS_S3

src/Storages/extractTableFunctionFromSelectQuery.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query)
2626
return table_expression ? table_expression->table_function : nullptr;
2727
}
2828

29+
ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query)
30+
{
31+
auto table_expression = extractTableExpressionASTPtrFromSelectQuery(query);
32+
return table_expression ? table_expression->database_and_table_name : nullptr;
33+
}
34+
2935
ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query)
3036
{
3137
auto table_function_ast = extractTableFunctionASTPtrFromSelectQuery(query);

src/Storages/extractTableFunctionFromSelectQuery.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ struct ASTTableExpression;
1010

1111
ASTTableExpression * extractTableExpressionASTPtrFromSelectQuery(ASTPtr & query);
1212
ASTPtr extractTableFunctionASTPtrFromSelectQuery(ASTPtr & query);
13+
ASTPtr extractTableASTPtrFromSelectQuery(ASTPtr & query);
1314
ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query);
1415
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query);
1516

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<clickhouse>
2+
<remote_servers>
3+
<cluster_simple>
4+
<shard>
5+
<replica>
6+
<host>node1</host>
7+
<port>9000</port>
8+
</replica>
9+
</shard>
10+
</cluster_simple>
11+
</remote_servers>
12+
</clickhouse>

0 commit comments

Comments
 (0)