Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$
RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events
WORKDIR ${SPARK_HOME}

ENV SPARK_VERSION=3.5.0
ENV SPARK_VERSION=3.5.3
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
ENV ICEBERG_VERSION=1.6.0
ENV ICEBERG_VERSION=1.6.1
ENV PYICEBERG_VERSION=0.7.1

RUN curl --retry 3 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,4 +1011,4 @@ def _empty_table_metadata() -> TableMetadata:
Returns:
TableMetadata: An empty TableMetadata instance.
"""
return TableMetadataV1(location="", last_column_id=-1, schema=Schema())
return TableMetadataV1.model_construct(last_column_id=-1, schema=Schema())
16 changes: 16 additions & 0 deletions pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,5 +587,21 @@ def parse_obj(data: Dict[str, Any]) -> TableMetadata:
else:
raise ValidationError(f"Unknown format version: {format_version}")

@staticmethod
def _construct_without_validation(table_metadata: TableMetadata) -> TableMetadata:
"""Construct table metadata from an existing table without performing validation.

This method is useful during a sequence of table updates when the model needs to be re-constructed but is not yet ready for validation.
"""
if table_metadata.format_version is None:
raise ValidationError(f"Missing format-version in TableMetadata: {table_metadata}")

if table_metadata.format_version == 1:
return TableMetadataV1.model_construct(**dict(table_metadata))
elif table_metadata.format_version == 2:
return TableMetadataV2.model_construct(**dict(table_metadata))
else:
raise ValidationError(f"Unknown format version: {table_metadata.format_version}")


TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2], Field(discriminator="format_version")] # type: ignore
39 changes: 28 additions & 11 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import uuid
from abc import ABC, abstractmethod
from copy import copy
from datetime import datetime
from functools import singledispatch
from typing import TYPE_CHECKING, Any, Dict, Generic, List, Literal, Optional, Tuple, TypeVar, Union
Expand All @@ -45,6 +44,7 @@
transform_dict_value_to_str,
)
from pyiceberg.utils.datetime import datetime_to_millis
from pyiceberg.utils.deprecated import deprecation_notice
from pyiceberg.utils.properties import property_as_int

if TYPE_CHECKING:
Expand Down Expand Up @@ -90,7 +90,13 @@ class AddSchemaUpdate(IcebergBaseModel):
# This field is required: https://github.com/apache/iceberg/pull/7445
last_column_id: int = Field(alias="last-column-id")

initial_change: bool = Field(default=False, exclude=True)
initial_change: bool = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is clean to go through the validation cycle, but I would be surprised if anyone would be relying on these properties

default=False,
exclude=True,
deprecated=deprecation_notice(
deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field"
),
)


class SetCurrentSchemaUpdate(IcebergBaseModel):
Expand All @@ -104,7 +110,13 @@ class AddPartitionSpecUpdate(IcebergBaseModel):
action: Literal["add-spec"] = Field(default="add-spec")
spec: PartitionSpec

initial_change: bool = Field(default=False, exclude=True)
initial_change: bool = Field(
default=False,
exclude=True,
deprecated=deprecation_notice(
deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field"
),
)


class SetDefaultSpecUpdate(IcebergBaseModel):
Expand All @@ -118,7 +130,13 @@ class AddSortOrderUpdate(IcebergBaseModel):
action: Literal["add-sort-order"] = Field(default="add-sort-order")
sort_order: SortOrder = Field(alias="sort-order")

initial_change: bool = Field(default=False, exclude=True)
initial_change: bool = Field(
default=False,
exclude=True,
deprecated=deprecation_notice(
deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field"
),
)


class SetDefaultSortOrderUpdate(IcebergBaseModel):
Expand Down Expand Up @@ -267,11 +285,10 @@ def _(
elif update.format_version == base_metadata.format_version:
return base_metadata

updated_metadata_data = copy(base_metadata.model_dump())
updated_metadata_data["format-version"] = update.format_version
updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


context.add_update(update)
return TableMetadataUtil.parse_obj(updated_metadata_data)
return TableMetadataUtil._construct_without_validation(updated_metadata)


@_apply_table_update.register(SetPropertiesUpdate)
Expand Down Expand Up @@ -306,7 +323,7 @@ def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: _TableMeta

metadata_updates: Dict[str, Any] = {
"last_column_id": update.last_column_id,
"schemas": [update.schema_] if update.initial_change else base_metadata.schemas + [update.schema_],
"schemas": base_metadata.schemas + [update.schema_],
}

context.add_update(update)
Expand Down Expand Up @@ -336,11 +353,11 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta
@_apply_table_update.register(AddPartitionSpecUpdate)
def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
for spec in base_metadata.partition_specs:
if spec.spec_id == update.spec.spec_id and not update.initial_change:
if spec.spec_id == update.spec.spec_id:
raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}")

metadata_updates: Dict[str, Any] = {
"partition_specs": [update.spec] if update.initial_change else base_metadata.partition_specs + [update.spec],
"partition_specs": base_metadata.partition_specs + [update.spec],
"last_partition_id": max(
max([field.field_id for field in update.spec.fields], default=0),
base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1,
Expand Down Expand Up @@ -448,7 +465,7 @@ def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableM
context.add_update(update)
return base_metadata.model_copy(
update={
"sort_orders": [update.sort_order] if update.initial_change else base_metadata.sort_orders + [update.sort_order],
"sort_orders": base_metadata.sort_orders + [update.sort_order],
}
)

Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/utils/deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ def new_func(*args: Any, **kwargs: Any) -> Any:
return decorator


def deprecation_notice(deprecated_in: str, removed_in: str, help_message: Optional[str]) -> str:
"""Return a deprecation notice."""
return f"Deprecated in {deprecated_in}, will be removed in {removed_in}. {help_message}"


def deprecation_message(deprecated_in: str, removed_in: str, help_message: Optional[str]) -> None:
"""Mark properties or behaviors as deprecated.

Adding this will result in a warning being emitted.
"""
message = f"Deprecated in {deprecated_in}, will be removed in {removed_in}. {help_message}"

_deprecation_warning(message)
_deprecation_warning(deprecation_notice(deprecated_in, removed_in, help_message))


def _deprecation_warning(message: str) -> None:
Expand Down
61 changes: 60 additions & 1 deletion tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
)
from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.sorting import (
Expand Down Expand Up @@ -1492,6 +1492,65 @@ def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> N
assert len(tbl.scan().to_arrow()) == 6


@pytest.mark.parametrize(
"catalog",
[lazy_fixture("catalog_memory"), lazy_fixture("catalog_sqlite")],
)
@pytest.mark.parametrize("format_version", [1, 2])
def test_create_table_transaction_with_non_default_values(
catalog: SqlCatalog, table_schema_with_all_types: Schema, format_version: int
) -> None:
identifier = f"default.create_table_transaction_with_non_default_values_{format_version}"
identifier_ref = f"default.create_table_with_non_default_values_ref_{format_version}"
try:
catalog.create_namespace("default")
except NamespaceAlreadyExistsError:
pass

try:
catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

try:
catalog.drop_table(identifier=identifier_ref)
except NoSuchTableError:
pass

iceberg_spec = PartitionSpec(*[
PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="integer_partition")
])

sort_order = SortOrder(*[SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC)])

txn = catalog.create_table_transaction(
identifier=identifier,
schema=table_schema_with_all_types,
partition_spec=iceberg_spec,
sort_order=sort_order,
properties={"format-version": format_version},
)
txn.commit_transaction()

tbl = catalog.load_table(identifier)

tbl_ref = catalog.create_table(
identifier=identifier_ref,
schema=table_schema_with_all_types,
partition_spec=iceberg_spec,
sort_order=sort_order,
properties={"format-version": format_version},
)

assert tbl.format_version == tbl_ref.format_version
assert tbl.schema() == tbl.schema()
assert tbl.schemas() == tbl_ref.schemas()
assert tbl.spec() == tbl_ref.spec()
assert tbl.specs() == tbl_ref.specs()
assert tbl.sort_order() == tbl_ref.sort_order()
assert tbl.sort_orders() == tbl_ref.sort_orders()


@pytest.mark.parametrize(
"catalog",
[
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2221,7 +2221,7 @@ def spark() -> "SparkSession":

spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
scala_version = "2.12"
iceberg_version = "1.4.3"
iceberg_version = "1.6.1"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version},"
Expand Down
7 changes: 3 additions & 4 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,8 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio
# Will rewrite a data file without the positional delete
tbl.delete(EqualTo("number", 40))

# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"]
# One positional delete has been added
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "overwrite"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}


Expand Down Expand Up @@ -448,7 +447,7 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio
assert len(snapshots) == 3

# Snapshots produced by Spark
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "overwrite"]
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "delete"]

# Will rewrite one parquet file
assert snapshots[2].summary == Summary(
Expand Down