-
Notifications
You must be signed in to change notification settings - Fork 351
partition field names validation against schema field conflicts #2305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
In Java all partition-schema validation goes through https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L392-L416 during table creation with partition specs, partition spec updates and also during schema evolution.
Are these the correct locations for the validation logic, or should they be placed elsewhere? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work on this!
To improve readability and keep related code together, what are your thoughts on placing all the partition validation logic inside the partitioning.py
file? Centralizing it there could make the validation process easier for future contributors to find and understand.
Let me know what you think! @kevinjqliu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR! I left a few comments. I like how we check for conflict for both changes to the PartitionSpec and changes to the Schema
I've double checked that there are only 2 places that modifies PartitionSpec, assign_fresh_partition_spec_ids
and UpdateSpec._apply
and we covered both with tests :)
Similarly we cover the 1 place that modifies Schema in UpdateSchema._apply
I think both java and rust lack the test to check PartitionSpec for conflict when the Schema is changed
def _create_table_with_schema( | ||
catalog: Catalog, schema: Schema, format_version: str, partition_spec: Optional[PartitionSpec] = None | ||
) -> Table: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following other create table helpers in tests, for example
iceberg-python/tests/integration/test_register_table.py
Lines 40 to 59 in 8013545
def _create_table( | |
session_catalog: Catalog, | |
identifier: str, | |
format_version: int, | |
location: str, | |
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | |
schema: Schema = TABLE_SCHEMA, | |
) -> Table: | |
try: | |
session_catalog.drop_table(identifier=identifier) | |
except NoSuchTableError: | |
pass | |
return session_catalog.create_table( | |
identifier=identifier, | |
schema=schema, | |
location=location, | |
properties={"format-version": str(format_version)}, | |
partition_spec=partition_spec, | |
) |
def _create_table_with_schema( | |
catalog: Catalog, schema: Schema, format_version: str, partition_spec: Optional[PartitionSpec] = None | |
) -> Table: | |
def _create_table_with_schema( | |
catalog: Catalog, schema: Schema, format_version: str, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC | |
) -> Table: |
if partition_spec: | ||
return catalog.create_table( | ||
identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | ||
) | ||
return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and then we can just do this
if partition_spec: | |
return catalog.create_table( | |
identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | |
) | |
return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) | |
return catalog.create_table( | |
identifier=tbl_name, schema=schema, partition_spec=partition_spec, properties={"format-version": format_version} | |
) |
pyiceberg/partitioning.py
Outdated
return # No conflict if field doesn't exist in schema | ||
|
||
if isinstance(partition_transform, (IdentityTransform, VoidTransform)): | ||
# For identity transforms, allow conflict only if sourced from the same schema field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# For identity transforms, allow conflict only if sourced from the same schema field | |
# For identity and void transforms, allow conflict only if sourced from the same schema field |
pyiceberg/partitioning.py
Outdated
raise ValueError(f"Cannot create identity partition from a different source field in the schema: {field_name}") | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match java error message
raise ValueError(f"Cannot create identity partition from a different source field in the schema: {field_name}") | |
else: | |
raise ValueError(f"Cannot create identity partition sourced from different field in schema: {field_name}") | |
else: |
pyiceberg/table/update/spec.py
Outdated
) -> None: | ||
from pyiceberg.partitioning import validate_partition_name | ||
|
||
validate_partition_name(name, transform, source_id, schema) | ||
if not name: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt about moving L183-L186 into the validate_partition_name
to mirror the java impl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that
_check_and_add_partition_name( | ||
self._transaction.table_metadata.schema(), | ||
added_field.name, | ||
added_field.source_id, | ||
added_field.transform, | ||
partition_names, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. just to confirm this covers the newly added partition fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's correct
pyiceberg/table/update/schema.py
Outdated
if self._transaction is not None: | ||
from pyiceberg.partitioning import validate_partition_name | ||
|
||
for spec in self._transaction.table_metadata.partition_specs: | ||
for partition_field in spec.fields: | ||
validate_partition_name( | ||
partition_field.name, partition_field.transform, partition_field.source_id, new_schema | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think there should always be a self._transaction
if self._transaction is not None: | |
from pyiceberg.partitioning import validate_partition_name | |
for spec in self._transaction.table_metadata.partition_specs: | |
for partition_field in spec.fields: | |
validate_partition_name( | |
partition_field.name, partition_field.transform, partition_field.source_id, new_schema | |
) | |
from pyiceberg.partitioning import validate_partition_name | |
for spec in self._transaction.table_metadata.partition_specs: | |
for partition_field in spec.fields: | |
validate_partition_name( | |
partition_field.name, partition_field.transform, partition_field.source_id, new_schema | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'll do the suggested changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some tests show that transaction can be None in some cases, (after removing the check, tests from test_schema.py are failing). They use: UpdateSchema(transaction=None, schema=Schema())
https://github.com/rutb327/iceberg-python/blob/24b12ddd8fdab4a62650786a2c3cdd56a53f8719/tests/test_schema.py#L933
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like everywhere else in the codebase we include transaction in UpdateSchema.
Maybe we can update the tests like this
def test_add_top_level_primitives(primitive_fields: List[NestedField], table_v2: Table) -> None:
for primitive_field in primitive_fields:
new_schema = Schema(primitive_field)
applied = UpdateSchema(transaction=Transaction(table_v2), schema=Schema()).union_by_name(new_schema)._apply() # type: ignore
assert applied == new_schema
I opened apache/iceberg#13833 and apache/iceberg-rust#1609 for checking for name conflict during schema update |
pyiceberg/table/update/schema.py
Outdated
if self._transaction is not None: | ||
from pyiceberg.partitioning import validate_partition_name | ||
|
||
for spec in self._transaction.table_metadata.partition_specs: | ||
for partition_field in spec.fields: | ||
validate_partition_name( | ||
partition_field.name, partition_field.transform, partition_field.source_id, new_schema | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like everywhere else in the codebase we include transaction in UpdateSchema.
Maybe we can update the tests like this
def test_add_top_level_primitives(primitive_fields: List[NestedField], table_v2: Table) -> None:
for primitive_field in primitive_fields:
new_schema = Schema(primitive_field)
applied = UpdateSchema(transaction=Transaction(table_v2), schema=Schema()).union_by_name(new_schema)._apply() # type: ignore
assert applied == new_schema
Co-authored-by: Fokko Driesprong <[email protected]>
Let's move this forward, thanks @rutb327 for working on this, and thanks @kevinjqliu and @dingo4dev for the review 🙌 |
Closes #2272
Collaborator: @geruh
Rationale for this change
Implements the validation logic described in #2272 to match Java and Rust behavior for partition field name conflicts with schema fields.
This mirrors the method in Java checkAndAddPartitionName():
https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L392-L416
Identity transforms (
sourceColumnID != null
)- Allow schema field name conflicts only when sourced form the same fieldNon-identity (
sourceColumnID == null
)- Disallow any schema field name conflicts.In this PR
isinstance(transform, (IdentityTransform, VoidTransform))
is used to achieve the same logic as Java’ssourceColumnID
check.Are these changes tested?
Yes, all existing tests pass and added a test covering validation scenarios.
Are there any user-facing changes?
Yes. Non-identity transforms can no longer use schema field names as partition field names.