Skip to content

Commit 8d72946

Browse files
committed
Fix confusing cluster name and named collection name in cluster functions
1 parent 49be26b commit 8d72946

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,15 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
396396
}
397397

398398
ASTPtr object_storage_type_arg;
399-
configuration->extractDynamicStorageType(args, context, &object_storage_type_arg);
399+
if (cluster_name_in_settings)
400+
configuration->extractDynamicStorageType(args, context, &object_storage_type_arg);
401+
else
402+
{
403+
auto args_copy = args;
404+
// Remove cluster name from args to avoid confusing cluster name and named collection name
405+
args_copy.erase(args_copy.begin());
406+
configuration->extractDynamicStorageType(args_copy, context, &object_storage_type_arg);
407+
}
400408
ASTPtr settings_temporary_storage = nullptr;
401409
for (auto * it = args.begin(); it != args.end(); ++it)
402410
{

tests/integration/helpers/iceberg_utils.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ def get_creation_expression(
196196
explicit_metadata_path="",
197197
storage_type_as_arg=False,
198198
storage_type_in_named_collection=False,
199+
cluster_name_as_literal=True,
199200
additional_settings = [],
200201
**kwargs,
201202
):
@@ -224,6 +225,8 @@ def get_creation_expression(
224225
else:
225226
settings_expression = ""
226227

228+
cluster_name = "'cluster_simple'" if cluster_name_as_literal else "cluster_simple"
229+
227230
storage_arg = storage_type
228231
engine_part = ""
229232
if (storage_type_in_named_collection):
@@ -252,7 +255,7 @@ def get_creation_expression(
252255

253256
if run_on_cluster:
254257
assert table_function
255-
return f"iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
258+
return f"iceberg{engine_part}Cluster({cluster_name}, {storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
256259
else:
257260
if table_function:
258261
return f"iceberg{engine_part}({storage_arg}, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
@@ -271,7 +274,7 @@ def get_creation_expression(
271274
if run_on_cluster:
272275
assert table_function
273276
return f"""
274-
iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
277+
iceberg{engine_part}Cluster({cluster_name}, {storage_arg}, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
275278
"""
276279
else:
277280
if table_function:
@@ -293,7 +296,7 @@ def get_creation_expression(
293296
if run_on_cluster:
294297
assert table_function
295298
return f"""
296-
iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
299+
iceberg{engine_part}Cluster({cluster_name}, {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
297300
"""
298301
else:
299302
if table_function:

tests/integration/test_storage_iceberg/test.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,8 @@ def count_secondary_subqueries(started_cluster, query_id, expected, comment):
412412

413413
@pytest.mark.parametrize("format_version", ["1", "2"])
414414
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
415-
def test_cluster_table_function(started_cluster, format_version, storage_type):
415+
@pytest.mark.parametrize("cluster_name_as_literal", [True, False])
416+
def test_cluster_table_function(started_cluster, format_version, storage_type, cluster_name_as_literal):
416417

417418
instance = started_cluster.instances["node1"]
418419
spark = started_cluster.spark_session
@@ -471,7 +472,7 @@ def add_df(mode):
471472

472473
# Regular Query only node1
473474
table_function_expr = get_creation_expression(
474-
storage_type, TABLE_NAME, started_cluster, table_function=True
475+
storage_type, TABLE_NAME, started_cluster, table_function=True, cluster_name_as_literal=cluster_name_as_literal
475476
)
476477
select_regular = (
477478
instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
@@ -492,6 +493,7 @@ def make_query_from_function(
492493
run_on_cluster=run_on_cluster,
493494
storage_type_as_arg=storage_type_as_arg,
494495
storage_type_in_named_collection=storage_type_in_named_collection,
496+
cluster_name_as_literal=cluster_name_as_literal,
495497
)
496498
query_id = str(uuid.uuid4())
497499
settings = f"SETTINGS object_storage_cluster='cluster_simple'" if (alt_syntax and not run_on_cluster) else ""

0 commit comments

Comments
 (0)