Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,7 @@ def _task_to_record_batches(
current_batch,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
projected_missing_fields=projected_missing_fields,
format_version=format_version,
)


Expand Down Expand Up @@ -1788,13 +1789,18 @@ def _to_requested_schema(
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
projected_missing_fields: Dict[int, Any] = EMPTY_DICT,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> pa.RecordBatch:
# We could reuse some of these visitors
struct_array = visit_with_partner(
requested_schema,
batch,
ArrowProjectionVisitor(
file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields
file_schema,
downcast_ns_timestamp_to_us,
include_field_ids,
projected_missing_fields=projected_missing_fields,
format_version=format_version,
),
ArrowAccessor(file_schema),
)
Expand All @@ -1808,19 +1814,23 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
_use_large_types: Optional[bool]
_projected_missing_fields: Dict[int, Any]

_format_version: TableVersion

def __init__(
self,
file_schema: Schema,
downcast_ns_timestamp_to_us: bool = False,
include_field_ids: bool = False,
use_large_types: Optional[bool] = None,
projected_missing_fields: Dict[int, Any] = EMPTY_DICT,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> None:
self._file_schema = file_schema
self._include_field_ids = include_field_ids
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._use_large_types = use_large_types
self._projected_missing_fields = projected_missing_fields
self._format_version = format_version

if use_large_types is not None:
deprecation_message(
Expand Down Expand Up @@ -1862,7 +1872,8 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:

if field.field_type != file_field.field_type:
target_schema = schema_to_pyarrow(
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
promote(file_field.field_type, field.field_type, format_version=self._format_version),
include_field_ids=self._include_field_ids,
)
if self._use_large_types is False:
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
Expand Down Expand Up @@ -2568,6 +2579,7 @@ def write_parquet(task: WriteTask) -> DataFile:
batch=batch,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
include_field_ids=True,
format_version=table_metadata.format_version,
)
for batch in task.record_batches
]
Expand Down Expand Up @@ -2658,7 +2670,7 @@ def _check_pyarrow_schema_compatible(
raise ValueError(
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
) from e
_check_schema_compatible(requested_schema, provided_schema)
_check_schema_compatible(requested_schema, provided_schema, format_version=format_version)


def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:
Expand Down
38 changes: 32 additions & 6 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from pydantic import Field, PrivateAttr, model_validator

from pyiceberg.exceptions import ResolveError
from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, StructProtocol
from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, StructProtocol, TableVersion
from pyiceberg.types import (
BinaryType,
BooleanType,
Expand Down Expand Up @@ -1622,7 +1622,10 @@ def _project_map(map_type: MapType, value_result: IcebergType) -> MapType:


@singledispatch
def promote(file_type: IcebergType, read_type: IcebergType) -> IcebergType:
def promote(file_type: IcebergType, read_type: IcebergType, format_version: Optional[TableVersion] = None) -> IcebergType:
from pyiceberg.table import TableProperties

format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION
"""Promotes reading a file type to a read type.

Args:
Expand Down Expand Up @@ -1692,6 +1695,22 @@ def _(file_type: FixedType, read_type: IcebergType) -> IcebergType:
raise ResolveError(f"Cannot promote {file_type} to {read_type}")


@promote.register(DateType)
def _(file_type: DateType, read_type: IcebergType, format_version: Optional[TableVersion] = None) -> IcebergType:
from pyiceberg.table import TableProperties

format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION
if format_version < 3:
raise ResolveError("DateType promotions can only occur on v3 tables.")

if isinstance(read_type, TimestampType):
return read_type
elif isinstance(read_type, TimestampNanoType):
return read_type
else:
raise ResolveError(f"Cannot promote {file_type} to {read_type}")


@promote.register(UnknownType)
def _(file_type: UnknownType, read_type: IcebergType) -> IcebergType:
# Per V3 Spec, "Unknown" can be promoted to any Primitive type
Expand All @@ -1701,7 +1720,12 @@ def _(file_type: UnknownType, read_type: IcebergType) -> IcebergType:
raise ResolveError(f"Cannot promote {file_type} to {read_type}")


def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) -> None:
def _check_schema_compatible(
requested_schema: Schema, provided_schema: Schema, format_version: Optional[TableVersion] = None
) -> None:
from pyiceberg.table import TableProperties

format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION
"""
Check if the `provided_schema` is compatible with `requested_schema`.

Expand All @@ -1715,17 +1739,19 @@ def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema)
Raises:
ValueError: If the schemas are not compatible.
"""
pre_order_visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema))
pre_order_visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema, format_version=format_version))


class _SchemaCompatibilityVisitor(PreOrderSchemaVisitor[bool]):
provided_schema: Schema
format_version: TableVersion

def __init__(self, provided_schema: Schema):
def __init__(self, provided_schema: Schema, format_version: TableVersion):
from rich.console import Console
from rich.table import Table as RichTable

self.provided_schema = provided_schema
self.format_version = format_version
self.rich_table = RichTable(show_header=True, header_style="bold")
self.rich_table.add_column("")
self.rich_table.add_column("Table field")
Expand Down Expand Up @@ -1766,7 +1792,7 @@ def _is_field_compatible(self, lhs: NestedField) -> bool:
try:
# If type can be promoted to the requested schema
# it is considered compatible
promote(rhs.field_type, lhs.field_type)
promote(rhs.field_type, lhs.field_type, format_version=self.format_version)
self.rich_table.add_row("✅", str(lhs), str(rhs))
return True
except ResolveError:
Expand Down
6 changes: 4 additions & 2 deletions pyiceberg/table/update/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def update_column(

if not self._allow_incompatible_changes and field.field_type != field_type:
try:
promote(field.field_type, field_type)
promote(field.field_type, field_type, format_version=self._transaction.table_metadata.format_version)
except ResolveError as e:
raise ValidationError(f"Cannot change column type: {full_name}: {field.field_type} -> {field_type}") from e

Expand Down Expand Up @@ -894,7 +894,9 @@ def _update_column(self, field: NestedField, existing_field: NestedField) -> Non
try:
# If the current type is wider than the new type, then
# we perform a noop
_ = promote(field.field_type, existing_field.field_type)
_ = promote(
field.field_type, existing_field.field_type, self.update_schema._transaction.table_metadata.format_version
)
except ResolveError:
# If this is not the case, perform the type evolution
self.update_schema.update_column(full_name, field_type=field.field_type)
Expand Down
14 changes: 13 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
from pyiceberg.schema import Accessor, Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import FileScanTask, Table
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import (
BinaryType,
Expand Down Expand Up @@ -2468,6 +2468,18 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table:
)


@pytest.fixture
def table_v3(example_table_metadata_v3: Dict[str, Any]) -> Table:
table_metadata = TableMetadataV3(**example_table_metadata_v3)
return Table(
identifier=("database", "table"),
metadata=table_metadata,
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
io=load_file_io(),
catalog=NoopCatalog("NoopCatalog"),
)


@pytest.fixture
def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table:
import copy
Expand Down
45 changes: 45 additions & 0 deletions tests/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
PrimitiveType,
StringType,
StructType,
TimestampNanoType,
TimestampType,
TimestamptzType,
TimeType,
Expand Down Expand Up @@ -1687,3 +1688,47 @@ def test_arrow_schema() -> None:
)

assert base_schema.as_arrow() == expected_schema


def test_promote_date_to_timestamp(table_v3: Table) -> None:
"""Test promoting a DateType to a TimestampType"""
current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False))

transaction = table_v3.transaction()
applied = UpdateSchema(transaction=transaction, schema=current_schema).union_by_name(new_schema)._apply()

assert applied.as_struct() == new_schema.as_struct()
assert len(applied.fields) == 1
assert isinstance(applied.fields[0].field_type, TimestampType)


def test_promote_date_to_timestampnano(table_v3: Table) -> None:
"""Test promoting a DateType to a TimestampNanoType"""
current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampNanoType(), required=False))

transaction = table_v3.transaction()
applied = UpdateSchema(transaction=transaction, schema=current_schema).union_by_name(new_schema)._apply()

assert applied.as_struct() == new_schema.as_struct()
assert len(applied.fields) == 1
assert isinstance(applied.fields[0].field_type, TimestampNanoType)


def test_promote_date_fails_for_v1_table(table_v1: Table) -> None:
"""Test that promoting a DateType fails for a v1 table"""
current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False))

with pytest.raises(ValidationError, match="Cannot change column type: a_date: date -> timestamp"):
_ = UpdateSchema(transaction=Transaction(table_v1), schema=current_schema).union_by_name(new_schema)._apply()


def test_promote_date_fails_for_v2_table(table_v2: Table) -> None:
"""Test that promoting a DateType fails for a v2 table"""
current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False))
new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False))

with pytest.raises(ValidationError, match="Cannot change column type: a_date: date -> timestamp"):
_ = UpdateSchema(transaction=Transaction(table_v2), schema=current_schema).union_by_name(new_schema)._apply()
Loading