Skip to content

Commit bba5b98

Browse files
authored
Merge pull request #1168 from Altinity/bugfix/antalya-25.8/fix_in_for_cluster_request
Fix IN with Iceberg table
2 parents 1a1d51a + cc35886 commit bba5b98

File tree

6 files changed

+114
-80
lines changed

6 files changed

+114
-80
lines changed

src/Interpreters/PreparedSets.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,12 @@ FutureSetFromSubquery::FutureSetFromSubquery(
172172
FutureSetFromSubquery::~FutureSetFromSubquery() = default;
173173

174174
SetPtr FutureSetFromSubquery::get() const
175+
{
176+
std::lock_guard lock(mutex);
177+
return get_unsafe();
178+
}
179+
180+
SetPtr FutureSetFromSubquery::get_unsafe() const
175181
{
176182
if (set_and_key->set != nullptr && set_and_key->set->isCreated())
177183
return set_and_key->set;
@@ -181,20 +187,32 @@ SetPtr FutureSetFromSubquery::get() const
181187

182188
void FutureSetFromSubquery::setQueryPlan(std::unique_ptr<QueryPlan> source_)
183189
{
190+
std::lock_guard lock(mutex);
184191
source = std::move(source_);
185192
set_and_key->set->setHeader(source->getCurrentHeader()->getColumnsWithTypeAndName());
186193
}
187194

188-
void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_) { external_table = std::move(external_table_); }
195+
void FutureSetFromSubquery::setExternalTable(StoragePtr external_table_)
196+
{
197+
std::lock_guard lock(mutex);
198+
external_table = std::move(external_table_);
199+
}
189200

190201
DataTypes FutureSetFromSubquery::getTypes() const
191202
{
203+
std::lock_guard lock(mutex);
192204
return set_and_key->set->getElementsTypes();
193205
}
194206

195207
FutureSet::Hash FutureSetFromSubquery::getHash() const { return hash; }
196208

197209
std::unique_ptr<QueryPlan> FutureSetFromSubquery::build(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache)
210+
{
211+
std::lock_guard lock(mutex);
212+
return build_unsafe(network_transfer_limits, prepared_sets_cache);
213+
}
214+
215+
std::unique_ptr<QueryPlan> FutureSetFromSubquery::build_unsafe(const SizeLimits & network_transfer_limits, const PreparedSetsCachePtr & prepared_sets_cache)
198216
{
199217
if (set_and_key->set->isCreated())
200218
return nullptr;
@@ -217,14 +235,16 @@ std::unique_ptr<QueryPlan> FutureSetFromSubquery::build(const SizeLimits & netwo
217235

218236
void FutureSetFromSubquery::buildSetInplace(const ContextPtr & context)
219237
{
238+
std::lock_guard lock(mutex);
239+
220240
if (external_table_set)
221241
external_table_set->buildSetInplace(context);
222242

223243
const auto & settings = context->getSettingsRef();
224244
SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]);
225245
auto prepared_sets_cache = context->getPreparedSetsCache();
226246

227-
auto plan = build(network_transfer_limits, prepared_sets_cache);
247+
auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache);
228248

229249
if (!plan)
230250
return;
@@ -242,7 +262,9 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
242262
if (!context->getSettingsRef()[Setting::use_index_for_in_with_subqueries])
243263
return nullptr;
244264

245-
if (auto set = get())
265+
std::lock_guard lock(mutex);
266+
267+
if (auto set = get_unsafe())
246268
{
247269
if (set->hasExplicitSetElements())
248270
return set;
@@ -264,7 +286,7 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
264286
SizeLimits network_transfer_limits(settings[Setting::max_rows_to_transfer], settings[Setting::max_bytes_to_transfer], settings[Setting::transfer_overflow_mode]);
265287
auto prepared_sets_cache = context->getPreparedSetsCache();
266288

267-
auto plan = build(network_transfer_limits, prepared_sets_cache);
289+
auto plan = build_unsafe(network_transfer_limits, prepared_sets_cache);
268290
if (!plan)
269291
return nullptr;
270292

src/Interpreters/PreparedSets.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ class FutureSetFromSubquery final : public FutureSet
170170
QueryPlan * getQueryPlan() { return source.get(); }
171171

172172
private:
173+
SetPtr get_unsafe() const;
174+
std::unique_ptr<QueryPlan> build_unsafe(
175+
const SizeLimits & network_transfer_limits,
176+
const PreparedSetsCachePtr & prepared_sets_cache);
177+
173178
Hash hash;
174179
ASTPtr ast;
175180
SetAndKeyPtr set_and_key;
@@ -178,6 +183,8 @@ class FutureSetFromSubquery final : public FutureSet
178183

179184
std::unique_ptr<QueryPlan> source;
180185
QueryTreeNodePtr query_tree;
186+
187+
mutable std::mutex mutex;
181188
};
182189

183190
using FutureSetFromSubqueryPtr = std::shared_ptr<FutureSetFromSubquery>;

src/Storages/IStorageCluster.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
573573
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
574574

575575
auto info = getQueryTreeInfo(query_info.query_tree, context);
576-
if (info.has_join || info.has_cross_join /*|| info.has_local_columns_in_where*/)
576+
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
577577
return QueryProcessingStage::Enum::FetchColumns;
578578
}
579579

tests/integration/test_database_iceberg/test.py

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -734,22 +734,22 @@ def test_cluster_joins(started_cluster):
734734

735735
assert res == "Jack\tSparrow\nJohn\tDow\n"
736736

737-
#res = node.query(
738-
# f"""
739-
# SELECT name
740-
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
741-
# WHERE tag in (
742-
# SELECT id
743-
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
744-
# )
745-
# ORDER BY ALL
746-
# SETTINGS
747-
# object_storage_cluster='cluster_simple',
748-
# object_storage_cluster_join_mode='local'
749-
# """
750-
#)
751-
752-
#assert res == "Jack\nJohn\n"
737+
res = node.query(
738+
f"""
739+
SELECT name
740+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
741+
WHERE tag in (
742+
SELECT id
743+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
744+
)
745+
ORDER BY ALL
746+
SETTINGS
747+
object_storage_cluster='cluster_simple',
748+
object_storage_cluster_join_mode='local'
749+
"""
750+
)
751+
752+
assert res == "Jack\nJohn\n"
753753

754754
res = node.query(
755755
f"""
@@ -767,22 +767,22 @@ def test_cluster_joins(started_cluster):
767767

768768
assert res == "Jack\tBlack\nJohn\tSilver\n"
769769

770-
#res = node.query(
771-
# f"""
772-
# SELECT name
773-
# FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
774-
# WHERE tag in (
775-
# SELECT id
776-
# FROM `{table_name_local}`
777-
# )
778-
# ORDER BY ALL
779-
# SETTINGS
780-
# object_storage_cluster='cluster_simple',
781-
# object_storage_cluster_join_mode='local'
782-
# """
783-
#)
784-
785-
#assert res == "Jack\nJohn\n"
770+
res = node.query(
771+
f"""
772+
SELECT name
773+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
774+
WHERE tag in (
775+
SELECT id
776+
FROM `{table_name_local}`
777+
)
778+
ORDER BY ALL
779+
SETTINGS
780+
object_storage_cluster='cluster_simple',
781+
object_storage_cluster_join_mode='local'
782+
"""
783+
)
784+
785+
assert res == "Jack\nJohn\n"
786786

787787
res = node.query(
788788
f"""

tests/integration/test_s3_cluster/test.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,7 @@ def test_joins(started_cluster):
10491049
node = started_cluster.instances["s0_0_0"]
10501050

10511051
# Table join_table only exists on the node 's0_0_0'.
1052+
node.query("DROP TABLE IF EXISTS join_table SYNC")
10521053
node.query(
10531054
"""
10541055
CREATE TABLE IF NOT EXISTS join_table (
@@ -1163,19 +1164,19 @@ def test_joins(started_cluster):
11631164
res = list(map(str.split, result5.splitlines()))
11641165
assert len(res) == 6
11651166

1166-
#result6 = node.query(
1167-
# f"""
1168-
# SELECT name FROM
1169-
# s3Cluster('cluster_simple',
1170-
# 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
1171-
# 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
1172-
# WHERE value IN (SELECT id FROM join_table)
1173-
# ORDER BY name
1174-
# SETTINGS object_storage_cluster_join_mode='local';
1175-
# """
1176-
#)
1177-
#res = list(map(str.split, result6.splitlines()))
1178-
#assert len(res) == 25
1167+
result6 = node.query(
1168+
f"""
1169+
SELECT name FROM
1170+
s3Cluster('cluster_simple',
1171+
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
1172+
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
1173+
WHERE value IN (SELECT id FROM join_table)
1174+
ORDER BY name
1175+
SETTINGS object_storage_cluster_join_mode='local';
1176+
"""
1177+
)
1178+
res = list(map(str.split, result6.splitlines()))
1179+
assert len(res) == 25
11791180

11801181

11811182
def test_graceful_shutdown(started_cluster):

tests/integration/test_storage_iceberg/test.py

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3551,6 +3551,10 @@ def execute_spark_query(query: str):
35513551

35523552
# Warm up metadata cache
35533553
for replica in started_cluster.instances.values():
3554+
replica.query("SYSTEM DROP UNCOMPRESSED CACHE")
3555+
replica.query("SYSTEM DROP QUERY CACHE")
3556+
replica.query("SYSTEM DROP FILESYSTEM CACHE")
3557+
replica.query("SYSTEM DROP ICEBERG METADATA CACHE")
35543558
replica.query(f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0")
35553559

35563560
all_data_expected_query_id = str(uuid.uuid4())
@@ -3738,22 +3742,22 @@ def execute_spark_query(query: str, table_name):
37383742

37393743
assert res == "jack\tsparrow\njohn\tdow\n"
37403744

3741-
#res = instance.query(
3742-
# f"""
3743-
# SELECT name
3744-
# FROM {creation_expression}
3745-
# WHERE tag in (
3746-
# SELECT id
3747-
# FROM {creation_expression_2}
3748-
# )
3749-
# ORDER BY ALL
3750-
# SETTINGS
3751-
# object_storage_cluster='cluster_simple',
3752-
# object_storage_cluster_join_mode='local'
3753-
# """
3754-
#)
3745+
res = instance.query(
3746+
f"""
3747+
SELECT name
3748+
FROM {creation_expression}
3749+
WHERE tag in (
3750+
SELECT id
3751+
FROM {creation_expression_2}
3752+
)
3753+
ORDER BY ALL
3754+
SETTINGS
3755+
object_storage_cluster='cluster_simple',
3756+
object_storage_cluster_join_mode='local'
3757+
"""
3758+
)
37553759

3756-
#assert res == "jack\njohn\n"
3760+
assert res == "jack\njohn\n"
37573761

37583762
res = instance.query(
37593763
f"""
@@ -3771,22 +3775,22 @@ def execute_spark_query(query: str, table_name):
37713775

37723776
assert res == "jack\tblack\njohn\tsilver\n"
37733777

3774-
#res = instance.query(
3775-
# f"""
3776-
# SELECT name
3777-
# FROM {creation_expression}
3778-
# WHERE tag in (
3779-
# SELECT id
3780-
# FROM `{TABLE_NAME_LOCAL}`
3781-
# )
3782-
# ORDER BY ALL
3783-
# SETTINGS
3784-
# object_storage_cluster='cluster_simple',
3785-
# object_storage_cluster_join_mode='local'
3786-
# """
3787-
#)
3778+
res = instance.query(
3779+
f"""
3780+
SELECT name
3781+
FROM {creation_expression}
3782+
WHERE tag in (
3783+
SELECT id
3784+
FROM `{TABLE_NAME_LOCAL}`
3785+
)
3786+
ORDER BY ALL
3787+
SETTINGS
3788+
object_storage_cluster='cluster_simple',
3789+
object_storage_cluster_join_mode='local'
3790+
"""
3791+
)
37883792

3789-
#assert res == "jack\njohn\n"
3793+
assert res == "jack\njohn\n"
37903794

37913795
res = instance.query(
37923796
f"""

0 commit comments

Comments
 (0)