From c4ee6d405a7051dd4cbacae0be41e77d3c75187d Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 6 Oct 2025 14:31:02 -0700 Subject: [PATCH] Allow for date->time promotion on v3 This allows date->time promotion on v3 tables only. --- pyiceberg/io/pyarrow.py | 18 ++++++++++--- pyiceberg/schema.py | 38 ++++++++++++++++++++++----- pyiceberg/table/update/schema.py | 6 +++-- tests/conftest.py | 14 +++++++++- tests/test_schema.py | 45 ++++++++++++++++++++++++++++++++ 5 files changed, 109 insertions(+), 12 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index b6ad5659b1..b66fea787f 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1596,6 +1596,7 @@ def _task_to_record_batches( current_batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, projected_missing_fields=projected_missing_fields, + format_version=format_version, ) @@ -1788,13 +1789,18 @@ def _to_requested_schema( downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False, projected_missing_fields: Dict[int, Any] = EMPTY_DICT, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, ) -> pa.RecordBatch: # We could reuse some of these visitors struct_array = visit_with_partner( requested_schema, batch, ArrowProjectionVisitor( - file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields + file_schema, + downcast_ns_timestamp_to_us, + include_field_ids, + projected_missing_fields=projected_missing_fields, + format_version=format_version, ), ArrowAccessor(file_schema), ) @@ -1808,6 +1814,8 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra _use_large_types: Optional[bool] _projected_missing_fields: Dict[int, Any] + _format_version: TableVersion + def __init__( self, file_schema: Schema, @@ -1815,12 +1823,14 @@ def __init__( include_field_ids: bool = False, use_large_types: Optional[bool] = None, projected_missing_fields: Dict[int, Any] = EMPTY_DICT, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, ) -> None: self._file_schema = file_schema self._include_field_ids = include_field_ids self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us self._use_large_types = use_large_types self._projected_missing_fields = projected_missing_fields + self._format_version = format_version if use_large_types is not None: deprecation_message( @@ -1862,7 +1872,8 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: if field.field_type != file_field.field_type: target_schema = schema_to_pyarrow( - promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids + promote(file_field.field_type, field.field_type, format_version=self._format_version), + include_field_ids=self._include_field_ids, ) if self._use_large_types is False: target_schema = _pyarrow_schema_ensure_small_types(target_schema) @@ -2568,6 +2579,7 @@ def write_parquet(task: WriteTask) -> DataFile: batch=batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, include_field_ids=True, + format_version=table_metadata.format_version, ) for batch in task.record_batches ] @@ -2658,7 +2670,7 @@ def _check_pyarrow_schema_compatible( raise ValueError( f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)." ) from e - _check_schema_compatible(requested_schema, provided_schema) + _check_schema_compatible(requested_schema, provided_schema, format_version=format_version) def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]: diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index d9c2d7ddfc..b4d5170d24 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -39,7 +39,7 @@ from pydantic import Field, PrivateAttr, model_validator from pyiceberg.exceptions import ResolveError -from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, StructProtocol +from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, StructProtocol, TableVersion from pyiceberg.types import ( BinaryType, BooleanType, @@ -1622,7 +1622,10 @@ def _project_map(map_type: MapType, value_result: IcebergType) -> MapType: @singledispatch -def promote(file_type: IcebergType, read_type: IcebergType) -> IcebergType: +def promote(file_type: IcebergType, read_type: IcebergType, format_version: Optional[TableVersion] = None) -> IcebergType: + from pyiceberg.table import TableProperties + + format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION """Promotes reading a file type to a read type. Args: @@ -1692,6 +1695,22 @@ def _(file_type: FixedType, read_type: IcebergType) -> IcebergType: raise ResolveError(f"Cannot promote {file_type} to {read_type}") +@promote.register(DateType) +def _(file_type: DateType, read_type: IcebergType, format_version: Optional[TableVersion] = None) -> IcebergType: + from pyiceberg.table import TableProperties + + format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION + if format_version < 3: + raise ResolveError("DateType promotions can only occur on v3 tables.") + + if isinstance(read_type, TimestampType): + return read_type + elif isinstance(read_type, TimestampNanoType): + return read_type + else: + raise ResolveError(f"Cannot promote {file_type} to {read_type}") + + @promote.register(UnknownType) def _(file_type: UnknownType, read_type: IcebergType) -> IcebergType: # Per V3 Spec, "Unknown" can be promoted to any Primitive type @@ -1701,7 +1720,12 @@ def _(file_type: UnknownType, read_type: IcebergType) -> IcebergType: raise ResolveError(f"Cannot promote {file_type} to {read_type}") -def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) -> None: +def _check_schema_compatible( + requested_schema: Schema, provided_schema: Schema, format_version: Optional[TableVersion] = None +) -> None: + from pyiceberg.table import TableProperties + + format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION """ Check if the `provided_schema` is compatible with `requested_schema`. @@ -1715,17 +1739,19 @@ def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) Raises: ValueError: If the schemas are not compatible. """ - pre_order_visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema)) + pre_order_visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema, format_version=format_version)) class _SchemaCompatibilityVisitor(PreOrderSchemaVisitor[bool]): provided_schema: Schema + format_version: TableVersion - def __init__(self, provided_schema: Schema): + def __init__(self, provided_schema: Schema, format_version: TableVersion): from rich.console import Console from rich.table import Table as RichTable self.provided_schema = provided_schema + self.format_version = format_version self.rich_table = RichTable(show_header=True, header_style="bold") self.rich_table.add_column("") self.rich_table.add_column("Table field") @@ -1766,7 +1792,7 @@ def _is_field_compatible(self, lhs: NestedField) -> bool: try: # If type can be promoted to the requested schema # it is considered compatible - promote(rhs.field_type, lhs.field_type) + promote(rhs.field_type, lhs.field_type, format_version=self.format_version) self.rich_table.add_row("✅", str(lhs), str(rhs)) return True except ResolveError: diff --git a/pyiceberg/table/update/schema.py b/pyiceberg/table/update/schema.py index b7ed7c3351..6656f2752b 100644 --- a/pyiceberg/table/update/schema.py +++ b/pyiceberg/table/update/schema.py @@ -470,7 +470,7 @@ def update_column( if not self._allow_incompatible_changes and field.field_type != field_type: try: - promote(field.field_type, field_type) + promote(field.field_type, field_type, format_version=self._transaction.table_metadata.format_version) except ResolveError as e: raise ValidationError(f"Cannot change column type: {full_name}: {field.field_type} -> {field_type}") from e @@ -894,7 +894,9 @@ def _update_column(self, field: NestedField, existing_field: NestedField) -> Non try: # If the current type is wider than the new type, then # we perform a noop - _ = promote(field.field_type, existing_field.field_type) + _ = promote( + field.field_type, existing_field.field_type, self.update_schema._transaction.table_metadata.format_version + ) except ResolveError: # If this is not the case, perform the type evolution self.update_schema.update_column(full_name, field_type=field.field_type) diff --git a/tests/conftest.py b/tests/conftest.py index 2b571d7320..c8b75510f2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -72,7 +72,7 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import FileScanTask, Table -from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2 +from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3 from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( BinaryType, @@ -2468,6 +2468,18 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table: ) +@pytest.fixture +def table_v3(example_table_metadata_v3: Dict[str, Any]) -> Table: + table_metadata = TableMetadataV3(**example_table_metadata_v3) + return Table( + identifier=("database", "table"), + metadata=table_metadata, + metadata_location=f"{table_metadata.location}/uuid.metadata.json", + io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), + ) + + @pytest.fixture def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table: import copy diff --git a/tests/test_schema.py b/tests/test_schema.py index e0dba59eaa..19460a3a63 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -52,6 +52,7 @@ PrimitiveType, StringType, StructType, + TimestampNanoType, TimestampType, TimestamptzType, TimeType, @@ -1687,3 +1688,47 @@ def test_arrow_schema() -> None: ) assert base_schema.as_arrow() == expected_schema + + +def test_promote_date_to_timestamp(table_v3: Table) -> None: + """Test promoting a DateType to a TimestampType""" + current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False)) + new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False)) + + transaction = table_v3.transaction() + applied = UpdateSchema(transaction=transaction, schema=current_schema).union_by_name(new_schema)._apply() + + assert applied.as_struct() == new_schema.as_struct() + assert len(applied.fields) == 1 + assert isinstance(applied.fields[0].field_type, TimestampType) + + +def test_promote_date_to_timestampnano(table_v3: Table) -> None: + """Test promoting a DateType to a TimestampNanoType""" + current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False)) + new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampNanoType(), required=False)) + + transaction = table_v3.transaction() + applied = UpdateSchema(transaction=transaction, schema=current_schema).union_by_name(new_schema)._apply() + + assert applied.as_struct() == new_schema.as_struct() + assert len(applied.fields) == 1 + assert isinstance(applied.fields[0].field_type, TimestampNanoType) + + +def test_promote_date_fails_for_v1_table(table_v1: Table) -> None: + """Test that promoting a DateType fails for a v1 table""" + current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False)) + new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False)) + + with pytest.raises(ValidationError, match="Cannot change column type: a_date: date -> timestamp"): + _ = UpdateSchema(transaction=Transaction(table_v1), schema=current_schema).union_by_name(new_schema)._apply() + + +def test_promote_date_fails_for_v2_table(table_v2: Table) -> None: + """Test that promoting a DateType fails for a v2 table""" + current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False)) + new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False)) + + with pytest.raises(ValidationError, match="Cannot change column type: a_date: date -> timestamp"): + _ = UpdateSchema(transaction=Transaction(table_v2), schema=current_schema).union_by_name(new_schema)._apply()