Skip to content

Commit 284250b

Browse files
committed
partition-schema name conflict validation function added
1 parent 92a29e8 commit 284250b

File tree

5 files changed

+124
-18
lines changed

5 files changed

+124
-18
lines changed

pyiceberg/partitioning.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,26 @@ def partition_to_path(self, data: Record, schema: Schema) -> str:
249249
UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
250250

251251

252+
def validate_partition_name(
253+
field_name: str,
254+
partition_transform: Transform[Any, Any],
255+
source_id: int,
256+
schema: Schema,
257+
) -> None:
258+
"""Validate that a partition field name doesn't conflict with schema field names."""
259+
try:
260+
schema_field = schema.find_field(field_name)
261+
except ValueError:
262+
return # No conflict if field doesn't exist in schema
263+
264+
if isinstance(partition_transform, (IdentityTransform, VoidTransform)):
265+
# For identity transforms, allow conflict only if sourced from the same schema field
266+
if schema_field.field_id != source_id:
267+
raise ValueError(f"Cannot create identity partition from a different source field in the schema: {field_name}")
268+
else:
269+
raise ValueError(f"Cannot create partition from name that exists in schema: {field_name}")
270+
271+
252272
def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fresh_schema: Schema) -> PartitionSpec:
253273
partition_fields = []
254274
for pos, field in enumerate(spec.fields):
@@ -258,6 +278,9 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
258278
fresh_field = fresh_schema.find_field(original_column_name)
259279
if fresh_field is None:
260280
raise ValueError(f"Could not find field in fresh schema: {original_column_name}")
281+
282+
validate_partition_name(field.name, field.transform, fresh_field.field_id, fresh_schema)
283+
261284
partition_fields.append(
262285
PartitionField(
263286
name=field.name,

pyiceberg/table/update/schema.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,14 @@ def _apply(self) -> Schema:
658658

659659
# Check the field-ids
660660
new_schema = Schema(*struct.fields)
661+
if self._transaction is not None:
662+
from pyiceberg.partitioning import validate_partition_name
663+
664+
for spec in self._transaction.table_metadata.partition_specs:
665+
for partition_field in spec.fields:
666+
validate_partition_name(
667+
partition_field.name, partition_field.transform, partition_field.source_id, new_schema
668+
)
661669
field_ids = set()
662670
for name in self._identifier_field_names:
663671
try:

pyiceberg/table/update/spec.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -177,18 +177,9 @@ def _apply(self) -> PartitionSpec:
177177
def _check_and_add_partition_name(
178178
schema: Schema, name: str, source_id: int, transform: Transform[Any, Any], partition_names: Set[str]
179179
) -> None:
180-
try:
181-
field = schema.find_field(name)
182-
except ValueError:
183-
field = None
184-
185-
if field is not None:
186-
if isinstance(transform, (IdentityTransform, VoidTransform)):
187-
# For identity transforms allow name conflict only if sourced from the same schema field
188-
if field.field_id != source_id:
189-
raise ValueError(f"Cannot create identity partition from a different field in the schema: {name}")
190-
else:
191-
raise ValueError(f"Cannot create partition from name that exists in schema: {name}")
180+
from pyiceberg.partitioning import validate_partition_name
181+
182+
validate_partition_name(name, transform, source_id, schema)
192183
if not name:
193184
raise ValueError("Undefined name")
194185
if name in partition_names:

tests/integration/test_partition_evolution.py

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
18+
from typing import Optional
1819

1920
import pytest
2021

@@ -63,12 +64,19 @@ def _table_v2(catalog: Catalog) -> Table:
6364
return _create_table_with_schema(catalog, schema_with_timestamp, "2")
6465

6566

66-
def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: str) -> Table:
67+
def _create_table_with_schema(
68+
catalog: Catalog, schema: Schema, format_version: str, partition_spec: Optional[PartitionSpec] = None
69+
) -> Table:
6770
tbl_name = "default.test_schema_evolution"
6871
try:
6972
catalog.drop_table(tbl_name)
7073
except NoSuchTableError:
7174
pass
75+
76+
if partition_spec:
77+
return catalog.create_table(
78+
identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version}
79+
)
7280
return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version})
7381

7482

@@ -582,10 +590,68 @@ def test_partition_schema_field_name_conflict(catalog: Catalog) -> None:
582590
with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: id"):
583591
table.update_spec().add_field("event_ts", DayTransform(), "id").commit()
584592

585-
with pytest.raises(ValueError, match="Cannot create identity partition from a different field in the schema: another_ts"):
593+
with pytest.raises(
594+
ValueError, match="Cannot create identity partition from a different source field in the schema: another_ts"
595+
):
586596
table.update_spec().add_field("event_ts", IdentityTransform(), "another_ts").commit()
587-
with pytest.raises(ValueError, match="Cannot create identity partition from a different field in the schema: str"):
597+
with pytest.raises(ValueError, match="Cannot create identity partition from a different source field in the schema: str"):
588598
table.update_spec().add_field("id", IdentityTransform(), "str").commit()
589599

590600
table.update_spec().add_field("id", IdentityTransform(), "id").commit()
591601
table.update_spec().add_field("event_ts", YearTransform(), "event_year").commit()
602+
603+
604+
@pytest.mark.integration
605+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
606+
def test_partition_validation_during_table_creation(catalog: Catalog) -> None:
607+
schema = Schema(
608+
NestedField(1, "id", LongType(), required=False),
609+
NestedField(2, "event_ts", TimestampType(), required=False),
610+
NestedField(3, "another_ts", TimestampType(), required=False),
611+
NestedField(4, "str", StringType(), required=False),
612+
)
613+
614+
partition_spec = PartitionSpec(
615+
PartitionField(source_id=2, field_id=1000, transform=YearTransform(), name="another_ts"), spec_id=1
616+
)
617+
with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: another_ts"):
618+
_create_table_with_schema(catalog, schema, "2", partition_spec)
619+
620+
partition_spec = PartitionSpec(
621+
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id"), spec_id=1
622+
)
623+
_create_table_with_schema(catalog, schema, "2", partition_spec)
624+
625+
626+
@pytest.mark.integration
627+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
628+
def test_schema_evolution_partition_conflict(catalog: Catalog) -> None:
629+
schema = Schema(
630+
NestedField(1, "id", LongType(), required=False),
631+
NestedField(2, "event_ts", TimestampType(), required=False),
632+
)
633+
partition_spec = PartitionSpec(
634+
PartitionField(source_id=2, field_id=1000, transform=YearTransform(), name="event_year"),
635+
PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="first_name"),
636+
PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="id"),
637+
spec_id=1,
638+
)
639+
table = _create_table_with_schema(catalog, schema, "2", partition_spec)
640+
641+
with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: event_year"):
642+
table.update_schema().add_column("event_year", StringType()).commit()
643+
with pytest.raises(
644+
ValueError, match="Cannot create identity partition from a different source field in the schema: first_name"
645+
):
646+
table.update_schema().add_column("first_name", StringType()).commit()
647+
648+
table.update_schema().add_column("other_field", StringType()).commit()
649+
650+
with pytest.raises(ValueError, match="Cannot create partition from name that exists in schema: event_year"):
651+
table.update_schema().rename_column("other_field", "event_year").commit()
652+
with pytest.raises(
653+
ValueError, match="Cannot create identity partition from a different source field in the schema: first_name"
654+
):
655+
table.update_schema().rename_column("other_field", "first_name").commit()
656+
657+
table.update_schema().rename_column("other_field", "valid_name").commit()

tests/integration/test_writes/test_partitioned_writes.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -980,8 +980,16 @@ def test_append_ymd_transform_partitioned(
980980
# Given
981981
identifier = f"default.arrow_table_v{format_version}_with_{str(transform)}_partition_on_col_{part_col}"
982982
nested_field = TABLE_SCHEMA.find_field(part_col)
983+
984+
if isinstance(transform, YearTransform):
985+
partition_name = f"{part_col}_year"
986+
elif isinstance(transform, MonthTransform):
987+
partition_name = f"{part_col}_month"
988+
elif isinstance(transform, DayTransform):
989+
partition_name = f"{part_col}_day"
990+
983991
partition_spec = PartitionSpec(
984-
PartitionField(source_id=nested_field.field_id, field_id=1001, transform=transform, name=part_col)
992+
PartitionField(source_id=nested_field.field_id, field_id=1001, transform=transform, name=partition_name)
985993
)
986994

987995
# When
@@ -1037,8 +1045,18 @@ def test_append_transform_partition_verify_partitions_count(
10371045
part_col = "timestamptz"
10381046
identifier = f"default.arrow_table_v{format_version}_with_{str(transform)}_transform_partitioned_on_col_{part_col}"
10391047
nested_field = table_date_timestamps_schema.find_field(part_col)
1048+
1049+
if isinstance(transform, YearTransform):
1050+
partition_name = f"{part_col}_year"
1051+
elif isinstance(transform, MonthTransform):
1052+
partition_name = f"{part_col}_month"
1053+
elif isinstance(transform, DayTransform):
1054+
partition_name = f"{part_col}_day"
1055+
elif isinstance(transform, HourTransform):
1056+
partition_name = f"{part_col}_hour"
1057+
10401058
partition_spec = PartitionSpec(
1041-
PartitionField(source_id=nested_field.field_id, field_id=1001, transform=transform, name=part_col),
1059+
PartitionField(source_id=nested_field.field_id, field_id=1001, transform=transform, name=partition_name),
10421060
)
10431061

10441062
# When
@@ -1061,7 +1079,7 @@ def test_append_transform_partition_verify_partitions_count(
10611079

10621080
partitions_table = tbl.inspect.partitions()
10631081
assert partitions_table.num_rows == len(expected_partitions)
1064-
assert {part[part_col] for part in partitions_table["partition"].to_pylist()} == expected_partitions
1082+
assert {part[partition_name] for part in partitions_table["partition"].to_pylist()} == expected_partitions
10651083
files_df = spark.sql(
10661084
f"""
10671085
SELECT *

0 commit comments

Comments
 (0)