Skip to content

Commit 4e9ac7e

Browse files
committed
Add test for tables
1 parent 43825ba commit 4e9ac7e

File tree

1 file changed

+86
-1
lines changed
  • tests/integration/test_database_iceberg

1 file changed

+86
-1
lines changed

tests/integration/test_database_iceberg/test.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,21 @@
1414
import pytz
1515
from minio import Minio
1616
from pyiceberg.catalog import load_catalog
17-
from pyiceberg.partitioning import PartitionField, PartitionSpec
17+
from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC
1818
from pyiceberg.schema import Schema
1919
from pyiceberg.table.sorting import SortField, SortOrder
2020
from pyiceberg.transforms import DayTransform, IdentityTransform
2121
from pyiceberg.types import (
2222
DoubleType,
23+
LongType,
2324
FloatType,
2425
NestedField,
2526
StringType,
2627
StructType,
2728
TimestampType,
2829
TimestamptzType
2930
)
31+
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
3032

3133
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
3234
from helpers.config_cluster import minio_secret_key, minio_access_key
@@ -609,3 +611,86 @@ def test_table_with_slash(started_cluster):
609611
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
610612
node.query(f"INSERT INTO {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}` VALUES (NULL, 'AAPL', 193.24, 193.31, tuple('bot'));", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1})
611613
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_encoded_name}`") == "\\N\tAAPL\t193.24\t193.31\t('bot')\n"
614+
615+
616+
def test_cluster_joins(started_cluster):
617+
node = started_cluster.instances["node1"]
618+
619+
test_ref = f"test_join_tables_{uuid.uuid4()}"
620+
table_name = f"{test_ref}_table"
621+
table_name_2 = f"{test_ref}_table_2"
622+
623+
root_namespace = f"{test_ref}_namespace"
624+
625+
catalog = load_catalog_impl(started_cluster)
626+
catalog.create_namespace(root_namespace)
627+
628+
schema = Schema(
629+
NestedField(
630+
field_id=1,
631+
name="tag",
632+
field_type=LongType(),
633+
required=False
634+
),
635+
NestedField(
636+
field_id=2,
637+
name="name",
638+
field_type=StringType(),
639+
required=False,
640+
),
641+
)
642+
table = create_table(catalog, root_namespace, table_name, schema,
643+
partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER)
644+
data = [{"tag": 1, "name": "John"}, {"tag": 2, "name": "Jack"}]
645+
df = pa.Table.from_pylist(data)
646+
table.append(df)
647+
648+
schema2 = Schema(
649+
NestedField(
650+
field_id=1,
651+
name="id",
652+
field_type=LongType(),
653+
required=False
654+
),
655+
NestedField(
656+
field_id=2,
657+
name="second_name",
658+
field_type=StringType(),
659+
required=False,
660+
),
661+
)
662+
table2 = create_table(catalog, root_namespace, table_name_2, schema2,
663+
partition_spec=UNPARTITIONED_PARTITION_SPEC, sort_order=UNSORTED_SORT_ORDER)
664+
data = [{"id": 1, "second_name": "Dow"}, {"id": 2, "second_name": "Sparrow"}]
665+
df = pa.Table.from_pylist(data)
666+
table2.append(df)
667+
668+
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
669+
670+
res = node.query(
671+
f"""
672+
SELECT t1.name,t2.second_name
673+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}` AS t1
674+
JOIN {CATALOG_NAME}.`{root_namespace}.{table_name_2}` AS t2
675+
ON t1.tag=t2.id
676+
ORDER BY ALL
677+
SETTINGS object_storage_cluster_join_mode='local'
678+
"""
679+
)
680+
681+
assert res == "Jack\tSparrow\nJohn\tDow\n"
682+
683+
res = node.query(
684+
f"""
685+
SELECT name
686+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`
687+
WHERE tag in (
688+
SELECT id
689+
FROM {CATALOG_NAME}.`{root_namespace}.{table_name_2}`
690+
)
691+
ORDER BY ALL
692+
SETTINGS object_storage_cluster_join_mode='local'
693+
"""
694+
)
695+
696+
assert res == "Jack\nJohn\n"

0 commit comments

Comments
 (0)