Skip to content

Commit ed20ac7

Browse files
committed
More tests
1 parent 3144857 commit ed20ac7

File tree

3 files changed

+29
-11
lines changed

3 files changed

+29
-11
lines changed

tests/integration/helpers/iceberg_utils.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ def get_creation_expression(
291291
if run_on_cluster:
292292
assert table_function
293293
return f"""
294-
iceberg{engine_part}Cluster('cluster_single_node', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
294+
iceberg{engine_part}Cluster('cluster_simple', {storage_arg}, path = '/iceberg_data/default/{table_name}/', format={format})
295295
"""
296296
else:
297297
if table_function:
@@ -477,6 +477,17 @@ def default_upload_directory(
477477
raise Exception(f"Unknown iceberg storage type: {storage_type}")
478478

479479

480+
def additional_upload_directory(
481+
started_cluster, node, storage_type, local_path, remote_path, **kwargs
482+
):
483+
if storage_type == "local":
484+
return LocalUploader(started_cluster.instances[node]).upload_directory(
485+
local_path, remote_path, **kwargs
486+
)
487+
else:
488+
raise Exception(f"Unknown iceberg storage type for additional uploading: {storage_type}")
489+
490+
480491
def default_download_directory(
481492
started_cluster, storage_type, remote_path, local_path, **kwargs
482493
):
@@ -500,6 +511,7 @@ def execute_spark_query_general(
500511
)
501512
return
502513

514+
503515
def get_last_snapshot(path_to_table):
504516
import json
505517
import os

tests/integration/test_storage_iceberg/configs/config.d/cluster.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,5 @@
1616
</replica>
1717
</shard>
1818
</cluster_simple>
19-
<cluster_single_node>
20-
<shard>
21-
<replica>
22-
<host>node1</host>
23-
<port>9000</port>
24-
</replica>
25-
</shard>
26-
</cluster_single_node>
2719
</remote_servers>
2820
</clickhouse>

tests/integration/test_storage_iceberg/test.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
from helpers.iceberg_utils import (
4040
default_upload_directory,
41+
additional_upload_directory,
4142
default_download_directory,
4243
execute_spark_query_general,
4344
get_creation_expression,
@@ -410,7 +411,7 @@ def count_secondary_subqueries(started_cluster, query_id, expected, comment):
410411

411412

412413
@pytest.mark.parametrize("format_version", ["1", "2"])
413-
@pytest.mark.parametrize("storage_type", ["s3", "azure"])
414+
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
414415
def test_cluster_table_function(started_cluster, format_version, storage_type):
415416

416417
instance = started_cluster.instances["node1"]
@@ -443,6 +444,19 @@ def add_df(mode):
443444

444445
logging.info(f"Adding another dataframe. result files: {files}")
445446

447+
if storage_type == "local":
448+
# For local storage we need to upload data from each node
449+
for node_name, replica in started_cluster.instances.items():
450+
if node_name == "node1":
451+
continue
452+
additional_upload_directory(
453+
started_cluster,
454+
node_name,
455+
storage_type,
456+
f"/iceberg_data/default/{TABLE_NAME}/",
457+
f"/iceberg_data/default/{TABLE_NAME}/",
458+
)
459+
446460
return files
447461

448462
files = add_df(mode="overwrite")
@@ -480,7 +494,7 @@ def make_query_from_function(
480494
storage_type_in_named_collection=storage_type_in_named_collection,
481495
)
482496
query_id = str(uuid.uuid4())
483-
settings = "SETTINGS object_storage_cluster='cluster_simple'" if alt_syntax else ""
497+
settings = f"SETTINGS object_storage_cluster='cluster_simple'" if (alt_syntax and not run_on_cluster) else ""
484498
if remote:
485499
query = f"SELECT * FROM remote('node2', {expr}) {settings}"
486500
else:

0 commit comments

Comments
 (0)