diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 3b37762638..2c1b19beb4 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -53,7 +53,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table, TableProperties from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -181,7 +181,10 @@ def create_table( ValueError: If the identifier is invalid, or no path is given to store metadata. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + schema: Schema = self._convert_schema_if_needed( # type: ignore + schema, + int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore + ) database_name, table_name = self.identifier_to_database_and_table(identifier) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 880a4db481..0167b5a1c1 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -62,7 +62,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table, TableProperties from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -200,7 +200,10 @@ def create_table( ValueError: If the identifier is invalid, or no path is given to store metadata. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + schema: Schema = self._convert_schema_if_needed( # type: ignore + schema, + int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore + ) namespace_identifier = Catalog.namespace_from(identifier) table_name = Catalog.table_name_from(identifier) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index ab85893ab4..df7e5eadef 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1453,6 +1453,7 @@ def _task_to_record_batches( case_sensitive: bool, name_mapping: Optional[NameMapping] = None, partition_spec: Optional[PartitionSpec] = None, + format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION, ) -> Iterator[pa.RecordBatch]: arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) with io.new_input(task.file.file_path).open() as fin: @@ -1462,7 +1463,9 @@ def _task_to_record_batches( # Hence it is reasonable to always cast 'ns' timestamp to 'us' on read. # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on # the table format version. - file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) + file_schema = pyarrow_to_schema( + physical_schema, name_mapping, downcast_ns_timestamp_to_us=True, format_version=format_version + ) # Apply column projection rules: https://iceberg.apache.org/spec/#column-projection projected_missing_fields = _get_column_projection_values( @@ -1691,6 +1694,7 @@ def _record_batches_from_scan_tasks_and_deletes( self._case_sensitive, self._table_metadata.name_mapping(), self._table_metadata.specs().get(task.file.spec_id), + self._table_metadata.format_version, ) for batch in batches: if self._limit is not None: diff --git a/pyiceberg/table/update/schema.py b/pyiceberg/table/update/schema.py index 6ad01e97f2..70c94e004a 100644 --- a/pyiceberg/table/update/schema.py +++ b/pyiceberg/table/update/schema.py @@ -34,6 +34,7 @@ visit, visit_with_partner, ) +from pyiceberg.table import TableProperties from pyiceberg.table.name_mapping import ( NameMapping, update_mapping, @@ -48,7 +49,7 @@ UpdatesAndRequirements, UpdateTableMetadata, ) -from pyiceberg.typedef import L +from pyiceberg.typedef import L, TableVersion from pyiceberg.types import IcebergType, ListType, MapType, NestedField, PrimitiveType, StructType if TYPE_CHECKING: @@ -142,11 +143,13 @@ def case_sensitive(self, case_sensitive: bool) -> UpdateSchema: self._case_sensitive = case_sensitive return self - def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema: + def union_by_name( + self, new_schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION + ) -> UpdateSchema: from pyiceberg.catalog import Catalog visit_with_partner( - Catalog._convert_schema_if_needed(new_schema), + Catalog._convert_schema_if_needed(new_schema, format_version=format_version), -1, _UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore diff --git a/tests/conftest.py b/tests/conftest.py index e036a2fa54..16c9e06dac 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2811,28 +2811,6 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem ) -@pytest.fixture(scope="session") -def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema": - """Pyarrow Schema with all microseconds timestamp.""" - import pyarrow as pa - - return pa.schema( - [ - ("timestamp_s", pa.timestamp(unit="us")), - ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")), - ("timestamp_ms", pa.timestamp(unit="us")), - ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")), - ("timestamp_us", pa.timestamp(unit="us")), - ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")), - ("timestamp_ns", pa.timestamp(unit="us")), - ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")), - ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")), - ("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")), - ("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")), - ] - ) - - @pytest.fixture(scope="session") def table_schema_with_all_microseconds_timestamp_precision() -> Schema: """Iceberg table Schema with only date, timestamp and timestamptz values.""" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 38aea1e255..5f0440b9cf 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -2251,13 +2251,19 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio @pytest.mark.integration def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None: identifier = "default.test_nanosecond_support_on_catalog" - # Create a pyarrow table with a nanosecond timestamp column - table = pa.Table.from_arrays( - [ - pa.array([datetime.now()], type=pa.timestamp("ns")), - pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")), - ], - names=["timestamp_ns", "timestamptz_ns"], + + catalog = load_catalog("default", type="in-memory") + catalog.create_namespace("ns") + + table = pa.Table.from_arrays([pa.array([datetime.now()], type=pa.timestamp("ns"))], names=["timestamps"]) + table2 = pa.Table.from_arrays( + [pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York"))], names=["timestamps"] ) _create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema) + + with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"): + catalog.create_table("ns.table1", schema=table.schema, properties={"format-version": "3"}) + + with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"): + catalog.create_table("ns.table2", schema=table2.schema, properties={"format-version": "3"})