Skip to content

Commit 43825ba

Browse files
committed
Some fixes
1 parent 38a45e4 commit 43825ba

File tree

3 files changed

+152
-33
lines changed

3 files changed

+152
-33
lines changed

src/Storages/IStorageCluster.cpp

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <Analyzer/ColumnNode.h>
3434
#include <Analyzer/JoinNode.h>
3535
#include <Analyzer/InDepthQueryTreeVisitor.h>
36+
#include <Analyzer/Utils.h>
3637
#include <Storages/StorageDistributed.h>
3738
#include <TableFunctions/TableFunctionFactory.h>
3839

@@ -222,37 +223,40 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
222223
{
223224
case ObjectStorageClusterJoinMode::LOCAL:
224225
{
225-
auto modified_query_tree = query_tree->clone();
226-
bool need_modify = false;
226+
if (has_join || has_local_columns_in_where)
227+
{
228+
auto modified_query_tree = query_tree->clone();
227229

228-
SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
229-
table_function_searcher.visit(query_tree);
230-
auto table_function_node = table_function_searcher.getNode();
231-
if (!table_function_node)
232-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
230+
SearcherVisitor table_function_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
231+
table_function_searcher.visit(modified_query_tree);
232+
auto table_function_node = table_function_searcher.getNode();
233+
if (!table_function_node)
234+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
233235

234-
if (has_join)
235-
{
236236
QueryTreeNodePtr query_tree_distributed;
237237

238238
auto & query_node = modified_query_tree->as<QueryNode &>();
239239

240-
if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION)
241-
{
242-
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
243-
query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
244-
auto & table_function_ast = table_function->as<ASTFunction &>();
245-
query_tree_distributed->setAlias(table_function_ast.alias);
246-
}
247-
else
240+
if (has_join)
248241
{
249-
auto join_node = query_node.getJoinTree();
250-
query_tree_distributed = join_node->as<JoinNode>()->getLeftTableExpression()->clone();
242+
if (table_function_searcher.getType().value() == QueryTreeNodeType::TABLE_FUNCTION)
243+
{
244+
auto table_function = extractTableFunctionASTPtrFromSelectQuery(query_to_send);
245+
query_tree_distributed = buildTableFunctionQueryTree(table_function, context);
246+
auto & table_function_ast = table_function->as<ASTFunction &>();
247+
query_tree_distributed->setAlias(table_function_ast.alias);
248+
}
249+
else
250+
{
251+
auto join_node = query_node.getJoinTree();
252+
query_tree_distributed = join_node->as<JoinNode>()->getLeftTableExpression()->clone();
253+
}
251254
}
252255

253256
// Find add used columns from table function to make proper projection list
257+
// Need to do before changing WHERE condition
254258
CollectUsedColumnsForSourceVisitor collector(table_function_node, context);
255-
collector.visit(query_tree);
259+
collector.visit(modified_query_tree);
256260
const auto & columns = collector.getColumns();
257261

258262
query_node.resolveProjectionColumns(columns);
@@ -262,20 +266,26 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
262266
column_nodes_to_select->getNodes().emplace_back(std::make_shared<ColumnNode>(column, table_function_node));
263267
query_node.getProjectionNode() = column_nodes_to_select;
264268

265-
// Left only table function to send on cluster nodes
266-
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
269+
if (has_local_columns_in_where)
270+
{
271+
if (query_node.getPrewhere())
272+
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getPrewhere(), table_function_node, context);
273+
if (query_node.getWhere())
274+
removeExpressionsThatDoNotDependOnTableIdentifiers(query_node.getWhere(), table_function_node, context);
275+
}
276+
277+
query_node.getOrderByNode() = std::make_shared<ListNode>();
278+
query_node.getGroupByNode() = std::make_shared<ListNode>();
267279

268-
need_modify = true;
269-
}
280+
if (query_tree_distributed)
281+
{
282+
// Left only table function to send on cluster nodes
283+
modified_query_tree = modified_query_tree->cloneAndReplace(query_node.getJoinTree(), query_tree_distributed);
284+
}
270285

271-
if (has_local_columns_in_where)
272-
{
273-
auto & query_node = modified_query_tree->as<QueryNode &>();
274-
query_node.getWhere() = {};
286+
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
275287
}
276288

277-
if (need_modify)
278-
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
279289
return;
280290
}
281291
case ObjectStorageClusterJoinMode::GLOBAL:
@@ -530,12 +540,16 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
530540
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table or table function node");
531541

532542
auto & query_node = query_info.query_tree->as<QueryNode &>();
533-
if (query_node.hasWhere())
543+
if (query_node.hasWhere() || query_node.hasPrewhere())
534544
{
535545
CollectUsedColumnsForSourceVisitor collector_where(table_function_node, context, true);
536-
collector_where.visit(query_node.getWhere());
546+
if (query_node.hasPrewhere())
547+
collector_where.visit(query_node.getPrewhere());
548+
if (query_node.hasWhere())
549+
collector_where.visit(query_node.getWhere());
537550

538-
// Can't use 'WHERE' on remote node if it contains columns from other sources
551+
// SELECT x FROM datalake.table WHERE x IN local.table
552+
// Need to modify 'WHERE' on remote node if it contains columns from other sources
539553
if (!collector_where.getColumns().empty())
540554
has_local_columns_in_where = true;
541555
}

tests/integration/test_s3_cluster/test.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,20 @@ def test_joins(started_cluster):
11631163
res = list(map(str.split, result5.splitlines()))
11641164
assert len(res) == 6
11651165

1166+
result6 = node.query(
1167+
f"""
1168+
SELECT name FROM
1169+
s3Cluster('cluster_simple',
1170+
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
1171+
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
1172+
WHERE value IN (SELECT id FROM join_table)
1173+
ORDER BY name
1174+
SETTINGS object_storage_cluster_join_mode='local';
1175+
"""
1176+
)
1177+
res = list(map(str.split, result6.splitlines()))
1178+
assert len(res) == 25
1179+
11661180

11671181
def test_graceful_shutdown(started_cluster):
11681182
node = started_cluster.instances["s0_0_0"]

tests/integration/test_storage_iceberg/test.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3492,3 +3492,94 @@ def compare_selects(query):
34923492
compare_selects(f"SELECT _path,* FROM {creation_expression} ORDER BY ALL")
34933493
compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE name_old='vasily' ORDER BY ALL")
34943494
compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL")
3495+
3496+
3497+
@pytest.mark.parametrize("storage_type", ["s3", "azure"])
3498+
def test_cluster_joins(started_cluster, storage_type):
3499+
instance = started_cluster.instances["node1"]
3500+
spark = started_cluster.spark_session
3501+
TABLE_NAME = "test_cluster_joins_" + storage_type + "_" + get_uuid_str()
3502+
TABLE_NAME_2 = "test_cluster_joins_2_" + storage_type + "_" + get_uuid_str()
3503+
3504+
def execute_spark_query(query: str, table_name):
3505+
return execute_spark_query_general(
3506+
spark,
3507+
started_cluster,
3508+
storage_type,
3509+
table_name,
3510+
query,
3511+
)
3512+
3513+
execute_spark_query(
3514+
f"""
3515+
CREATE TABLE {TABLE_NAME} (
3516+
tag INT,
3517+
name VARCHAR(50)
3518+
)
3519+
USING iceberg
3520+
OPTIONS('format-version'='2')
3521+
""", TABLE_NAME
3522+
)
3523+
3524+
execute_spark_query(
3525+
f"""
3526+
INSERT INTO {TABLE_NAME} VALUES
3527+
(1, 'john'),
3528+
(2, 'jack')
3529+
""", TABLE_NAME
3530+
)
3531+
3532+
execute_spark_query(
3533+
f"""
3534+
CREATE TABLE {TABLE_NAME_2} (
3535+
id INT,
3536+
second_name VARCHAR(50)
3537+
)
3538+
USING iceberg
3539+
OPTIONS('format-version'='2')
3540+
""", TABLE_NAME_2
3541+
)
3542+
3543+
execute_spark_query(
3544+
f"""
3545+
INSERT INTO {TABLE_NAME_2} VALUES
3546+
(1, 'dow'),
3547+
(2, 'sparrow')
3548+
""", TABLE_NAME_2
3549+
)
3550+
3551+
creation_expression = get_creation_expression(
3552+
storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True
3553+
)
3554+
3555+
creation_expression_2 = get_creation_expression(
3556+
storage_type, TABLE_NAME_2, started_cluster, table_function=True, run_on_cluster=True
3557+
)
3558+
3559+
res = instance.query(
3560+
f"""
3561+
SELECT t1.name,t2.second_name
3562+
FROM {creation_expression} AS t1
3563+
JOIN {creation_expression_2} AS t2
3564+
ON t1.tag=t2.id
3565+
ORDER BY ALL
3566+
SETTINGS object_storage_cluster_join_mode='local'
3567+
"""
3568+
)
3569+
3570+
assert res == "jack\tsparrow\njohn\tdow\n"
3571+
3572+
res = instance.query(
3573+
f"""
3574+
SELECT name
3575+
FROM {creation_expression}
3576+
WHERE tag in (
3577+
SELECT id
3578+
FROM {creation_expression_2}
3579+
)
3580+
ORDER BY ALL
3581+
SETTINGS object_storage_cluster_join_mode='local'
3582+
"""
3583+
)
3584+
3585+
assert res == "jack\njohn\n"

0 commit comments

Comments
 (0)