Skip to content

Commit e6f0a5c

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #972 from Altinity/feature/antalya-25.6.5/s3cluster_global_join
1 parent 06e34b3 commit e6f0a5c

File tree

11 files changed

+403
-7
lines changed

11 files changed

+403
-7
lines changed

src/Core/Settings.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1760,6 +1760,22 @@ Possible values:
17601760
- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
17611761
- `allow` — Allows the use of these types of subqueries.
17621762
)", IMPORTANT) \
1763+
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
1764+
Changes the behaviour of object storage cluster function or table.
1765+
1766+
ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table.
1767+
1768+
Restrictions:
1769+
1770+
- Only applied for JOIN subqueries.
1771+
- Only if the FROM section uses a object storage cluster function ot table.
1772+
1773+
Possible values:
1774+
1775+
- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.`
1776+
- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
1777+
- `allow` — Default value. Allows the use of these types of subqueries.
1778+
)", 0) \
17631779
\
17641780
DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"(
17651781
Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries.

src/Core/Settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class WriteBuffer;
5858
M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \
5959
M(CLASS_NAME, DistributedDDLOutputMode) \
6060
M(CLASS_NAME, DistributedProductMode) \
61+
M(CLASS_NAME, ObjectStorageClusterJoinMode) \
6162
M(CLASS_NAME, Double) \
6263
M(CLASS_NAME, EscapingRule) \
6364
M(CLASS_NAME, Float) \

src/Core/SettingsChangesHistory.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
132132
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
133133
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
134134
/// RELEASE CLOSED
135+
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
136+
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
137+
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
138+
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
139+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
140+
{"object_storage_cluster", "", "", "New setting"},
141+
{"object_storage_max_nodes", 0, 0, "New setting"},
142+
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
143+
{"object_storage_remote_initiator", false, false, "New setting."},
144+
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
135145
});
136146
addSettingsChanges(settings_changes_history, "25.6",
137147
{

src/Core/SettingsEnums.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P
9090
{"global", DistributedProductMode::GLOBAL},
9191
{"allow", DistributedProductMode::ALLOW}})
9292

93+
IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS,
94+
{{"local", ObjectStorageClusterJoinMode::LOCAL},
95+
{"global", ObjectStorageClusterJoinMode::GLOBAL},
96+
{"allow", ObjectStorageClusterJoinMode::ALLOW}})
97+
9398

9499
IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS,
95100
{{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw},

src/Core/SettingsEnums.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ enum class DistributedProductMode : uint8_t
163163

164164
DECLARE_SETTING_ENUM(DistributedProductMode)
165165

166+
/// The setting for executing object storage cluster function ot table JOIN sections.
167+
enum class ObjectStorageClusterJoinMode : uint8_t
168+
{
169+
LOCAL, /// Convert to local query
170+
GLOBAL, /// Convert to global query
171+
ALLOW /// Enable
172+
};
173+
174+
DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode)
175+
166176
/// How the query result cache handles queries with non-deterministic functions, e.g. now()
167177
enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t
168178
{

src/Planner/PlannerJoinTree.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,6 +1372,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
13721372
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
13731373
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
13741374
updated_actions_dag_outputs.push_back(output_node);
1375+
else if (table_function_node && table_function_node->getStorage()->isRemote())
1376+
updated_actions_dag_outputs.push_back(output_node);
13751377
}
13761378
else
13771379
updated_actions_dag_outputs.push_back(&rename_actions_dag.addAlias(*output_node, *column_identifier));

src/Storages/IStorageCluster.cpp

Lines changed: 219 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
#include <Storages/IStorage.h>
2323
#include <Storages/SelectQueryInfo.h>
2424
#include <Storages/StorageDictionary.h>
25+
#include <Storages/extractTableFunctionFromSelectQuery.h>
26+
#include <Planner/Utils.h>
27+
#include <Analyzer/QueryTreeBuilder.h>
28+
#include <Analyzer/QueryNode.h>
29+
#include <Analyzer/ColumnNode.h>
30+
#include <Analyzer/InDepthQueryTreeVisitor.h>
2531

2632
#include <algorithm>
2733
#include <memory>
@@ -39,6 +45,12 @@ namespace Setting
3945
extern const SettingsBool parallel_replicas_local_plan;
4046
extern const SettingsString cluster_for_parallel_replicas;
4147
extern const SettingsNonZeroUInt64 max_parallel_replicas;
48+
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
49+
}
50+
51+
namespace ErrorCodes
52+
{
53+
extern const int LOGICAL_ERROR;
4254
}
4355

4456
namespace ErrorCodes
@@ -79,6 +91,175 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
7991
cluster);
8092
}
8193

94+
namespace
95+
{
96+
97+
/*
98+
Helping class to find in query tree first node of required type
99+
*/
100+
class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisitor>
101+
{
102+
public:
103+
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
104+
using Base::Base;
105+
106+
explicit SearcherVisitor(QueryTreeNodeType type_, ContextPtr context) : Base(context), type(type_) {}
107+
108+
bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & /*child*/)
109+
{
110+
return !passed_node;
111+
}
112+
113+
void enterImpl(QueryTreeNodePtr & node)
114+
{
115+
if (passed_node)
116+
return;
117+
118+
auto node_type = node->getNodeType();
119+
120+
if (node_type == type)
121+
passed_node = node;
122+
}
123+
124+
QueryTreeNodePtr getNode() const { return passed_node; }
125+
126+
private:
127+
QueryTreeNodeType type;
128+
QueryTreeNodePtr passed_node;
129+
};
130+
131+
/*
132+
Helping class to find all used columns with specific source
133+
*/
134+
class CollectUsedColumnsForSourceVisitor : public InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor>
135+
{
136+
public:
137+
using Base = InDepthQueryTreeVisitorWithContext<CollectUsedColumnsForSourceVisitor>;
138+
using Base::Base;
139+
140+
explicit CollectUsedColumnsForSourceVisitor(
141+
QueryTreeNodePtr source_,
142+
ContextPtr context,
143+
bool collect_columns_from_other_sources_ = false)
144+
: Base(context)
145+
, source(source_)
146+
, collect_columns_from_other_sources(collect_columns_from_other_sources_)
147+
{}
148+
149+
void enterImpl(QueryTreeNodePtr & node)
150+
{
151+
auto node_type = node->getNodeType();
152+
153+
if (node_type != QueryTreeNodeType::COLUMN)
154+
return;
155+
156+
auto & column_node = node->as<ColumnNode &>();
157+
auto column_source = column_node.getColumnSourceOrNull();
158+
if (!column_source)
159+
return;
160+
161+
if ((column_source == source) != collect_columns_from_other_sources)
162+
{
163+
const auto & name = column_node.getColumnName();
164+
if (!names.count(name))
165+
{
166+
columns.emplace_back(column_node.getColumn());
167+
names.insert(name);
168+
}
169+
}
170+
}
171+
172+
const NamesAndTypes & getColumns() const { return columns; }
173+
174+
private:
175+
std::unordered_set<std::string> names;
176+
QueryTreeNodePtr source;
177+
NamesAndTypes columns;
178+
bool collect_columns_from_other_sources;
179+
};
180+
181+
};
182+
183+
/*
184+
Try to make subquery to send on nodes
185+
Converts
186+
187+
SELECT s3.c1, s3.c2, t.c3
188+
FROM
189+
s3Cluster(...) AS s3
190+
JOIN
191+
localtable as t
192+
ON s3.key == t.key
193+
194+
to
195+
196+
SELECT s3.c1, s3.c2, s3.key
197+
FROM
198+
s3Cluster(...) AS s3
199+
*/
200+
void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
201+
ASTPtr & query_to_send,
202+
QueryTreeNodePtr query_tree,
203+
const ContextPtr & context)
204+
{
205+
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
206+
switch (object_storage_cluster_join_mode)
207+
{
208+
case ObjectStorageClusterJoinMode::LOCAL:
209+
{
210+
auto modified_query_tree = query_tree->clone();
211+
bool need_modify = false;
212+
213+
SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
214+
table_function_searcher.visit(query_tree);
215+
auto table_function_node = table_function_searcher.getNode();
216+
if (!table_function_node)
217+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
218+
219+
if (has_join)
220+
{
221+
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
222+
auto query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
223+
auto & table_function_ast = table_function->as<ASTFunction &>();
224+
query_tree_distributed->setAlias(table_function_ast.alias);
225+
226+
// Find add used columns from table function to make proper projection list
227+
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
228+
collector.visit(query_tree);
229+
const auto & columns = collector.getColumns();
230+
231+
auto & query_node = modified_query_tree->as<QueryNode &>();
232+
query_node.resolveProjectionColumns(columns);
233+
auto column_nodes_to_select = std::make_shared<ListNode>();
234+
column_nodes_to_select->getNodes().reserve(columns.size());
235+
for (auto & column : columns)
236+
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
237+
query_node.getProjectionNode() = column_nodes_to_select;
238+
239+
// Left only table function to send on cluster nodes
240+
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
241+
242+
need_modify = true;
243+
}
244+
245+
if (has_local_columns_in_where)
246+
{
247+
auto & query_node = modified_query_tree->as<QueryNode &>();
248+
query_node.getWhere() = {};
249+
}
250+
251+
if (need_modify)
252+
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
253+
return;
254+
}
255+
case ObjectStorageClusterJoinMode::GLOBAL:
256+
// TODO
257+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now");
258+
case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special
259+
return;
260+
}
261+
}
262+
82263
/// The code executes on initiator
83264
void IStorageCluster::read(
84265
QueryPlan & query_plan,
@@ -100,21 +281,23 @@ void IStorageCluster::read(
100281
SharedHeader sample_block;
101282
ASTPtr query_to_send = query_info.query;
102283

284+
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
285+
103286
if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
104287
{
105-
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
288+
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage));
106289
}
107290
else
108291
{
109-
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
292+
auto interpreter = InterpreterSelectQuery(query_to_send, context, SelectQueryOptions(processed_stage).analyze());
110293
sample_block = interpreter.getSampleBlock();
111294
query_to_send = interpreter.getQueryInfo().query->clone();
112295
}
113296

114297
updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);
115298

116299
RestoreQualifiedNamesVisitor::Data data;
117-
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
300+
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as<ASTSelectQuery &>(), 0));
118301
data.remote_table.database = context->getCurrentDatabase();
119302
data.remote_table.table = getName();
120303
RestoreQualifiedNamesVisitor(data).visit(query_to_send);
@@ -209,8 +392,40 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
209392
}
210393

211394
QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
212-
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
395+
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo & query_info) const
213396
{
397+
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
398+
399+
if (object_storage_cluster_join_mode != ObjectStorageClusterJoinMode::ALLOW)
400+
{
401+
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
402+
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
403+
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
404+
405+
SearcherVisitor join_searcher(QueryTreeNodeType::JOIN, context);
406+
join_searcher.visit(query_info.query_tree);
407+
if (join_searcher.getNode())
408+
has_join = true;
409+
410+
SearcherVisitor table_function_searcher(QueryTreeNodeType::TABLE_FUNCTION, context);
411+
table_function_searcher.visit(query_info.query_tree);
412+
auto table_function_node = table_function_searcher.getNode();
413+
if (!table_function_node)
414+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
415+
416+
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
417+
auto & query_node = query_info.query_tree->as<QueryNode &>();
418+
if (query_node.hasWhere())
419+
collector_where.visit(query_node.getWhere());
420+
421+
// Can't use 'WHERE' on remote node if it contains columns from other sources
422+
if (!collector_where.getColumns().empty())
423+
has_local_columns_in_where = true;
424+
425+
if (has_join || has_local_columns_in_where)
426+
return QueryProcessingStage::Enum::FetchColumns;
427+
}
428+
214429
/// Initiator executes query on remote node.
215430
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
216431
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)

src/Storages/IStorageCluster.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,14 @@ class IStorageCluster : public IStorage
5353
protected:
5454
virtual void updateBeforeRead(const ContextPtr &) {}
5555
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
56+
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);
5657

5758
private:
5859
LoggerPtr log;
5960
String cluster_name;
61+
62+
mutable bool has_join = false;
63+
mutable bool has_local_columns_in_where = false;
6064
};
6165

6266

0 commit comments

Comments
 (0)