Skip to content

Commit 102e58b

Browse files
committed
Add the rest of the _convert_schema_if_needed calls
1 parent 8b43eb8 commit 102e58b

File tree

6 files changed

+33
-41
lines changed

6 files changed

+33
-41
lines changed

pyiceberg/catalog/dynamodb.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
5454
from pyiceberg.schema import Schema
5555
from pyiceberg.serializers import FromInputFile
56-
from pyiceberg.table import CommitTableResponse, Table
56+
from pyiceberg.table import CommitTableResponse, Table, TableProperties
5757
from pyiceberg.table.locations import load_location_provider
5858
from pyiceberg.table.metadata import new_table_metadata
5959
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -181,7 +181,10 @@ def create_table(
181181
ValueError: If the identifier is invalid, or no path is given to store metadata.
182182
183183
"""
184-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
184+
schema: Schema = self._convert_schema_if_needed( # type: ignore
185+
schema,
186+
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
187+
)
185188

186189
database_name, table_name = self.identifier_to_database_and_table(identifier)
187190

pyiceberg/catalog/sql.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6363
from pyiceberg.schema import Schema
6464
from pyiceberg.serializers import FromInputFile
65-
from pyiceberg.table import CommitTableResponse, Table
65+
from pyiceberg.table import CommitTableResponse, Table, TableProperties
6666
from pyiceberg.table.locations import load_location_provider
6767
from pyiceberg.table.metadata import new_table_metadata
6868
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -200,7 +200,10 @@ def create_table(
200200
ValueError: If the identifier is invalid, or no path is given to store metadata.
201201
202202
"""
203-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
203+
schema: Schema = self._convert_schema_if_needed( # type: ignore
204+
schema,
205+
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
206+
)
204207

205208
namespace_identifier = Catalog.namespace_from(identifier)
206209
table_name = Catalog.table_name_from(identifier)

pyiceberg/io/pyarrow.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1483,6 +1483,7 @@ def _task_to_record_batches(
14831483
case_sensitive: bool,
14841484
name_mapping: Optional[NameMapping] = None,
14851485
partition_spec: Optional[PartitionSpec] = None,
1486+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
14861487
) -> Iterator[pa.RecordBatch]:
14871488
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
14881489
with io.new_input(task.file.file_path).open() as fin:
@@ -1492,7 +1493,9 @@ def _task_to_record_batches(
14921493
# Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
14931494
# When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on
14941495
# the table format version.
1495-
file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True)
1496+
file_schema = pyarrow_to_schema(
1497+
physical_schema, name_mapping, downcast_ns_timestamp_to_us=True, format_version=format_version
1498+
)
14961499

14971500
# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
14981501
projected_missing_fields = _get_column_projection_values(
@@ -1721,6 +1724,7 @@ def _record_batches_from_scan_tasks_and_deletes(
17211724
self._case_sensitive,
17221725
self._table_metadata.name_mapping(),
17231726
self._table_metadata.specs().get(task.file.spec_id),
1727+
self._table_metadata.format_version,
17241728
)
17251729
for batch in batches:
17261730
if self._limit is not None:

pyiceberg/table/update/schema.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
UpdatesAndRequirements,
4949
UpdateTableMetadata,
5050
)
51-
from pyiceberg.typedef import L
51+
from pyiceberg.typedef import L, TableVersion
5252
from pyiceberg.types import IcebergType, ListType, MapType, NestedField, PrimitiveType, StructType
5353

5454
if TYPE_CHECKING:
@@ -142,11 +142,13 @@ def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
142142
self._case_sensitive = case_sensitive
143143
return self
144144

145-
def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
145+
def union_by_name(
146+
self, new_schema: Union[Schema, "pa.Schema"], format_version: TableVersion = "TableProperties.DEFAULT_FORMAT_VERSION"
147+
) -> UpdateSchema:
146148
from pyiceberg.catalog import Catalog
147149

148150
visit_with_partner(
149-
Catalog._convert_schema_if_needed(new_schema),
151+
Catalog._convert_schema_if_needed(new_schema, format_version=format_version),
150152
-1,
151153
_UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
152154
# type: ignore

tests/conftest.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2811,28 +2811,6 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
28112811
)
28122812

28132813

2814-
@pytest.fixture(scope="session")
2815-
def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
2816-
"""Pyarrow Schema with all microseconds timestamp."""
2817-
import pyarrow as pa
2818-
2819-
return pa.schema(
2820-
[
2821-
("timestamp_s", pa.timestamp(unit="us")),
2822-
("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
2823-
("timestamp_ms", pa.timestamp(unit="us")),
2824-
("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
2825-
("timestamp_us", pa.timestamp(unit="us")),
2826-
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
2827-
("timestamp_ns", pa.timestamp(unit="us")),
2828-
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
2829-
("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
2830-
("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
2831-
("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
2832-
]
2833-
)
2834-
2835-
28362814
@pytest.fixture(scope="session")
28372815
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
28382816
"""Iceberg table Schema with only date, timestamp and timestamptz values."""

tests/integration/test_writes/test_writes.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import math
1919
import os
2020
import random
21+
import re
2122
import time
2223
import uuid
2324
from datetime import date, datetime, timedelta
@@ -44,7 +45,7 @@
4445
from pyiceberg.catalog.sql import SqlCatalog
4546
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError
4647
from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not
47-
from pyiceberg.io.pyarrow import _dataframe_to_data_files
48+
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _dataframe_to_data_files
4849
from pyiceberg.partitioning import PartitionField, PartitionSpec
4950
from pyiceberg.schema import Schema
5051
from pyiceberg.table import TableProperties
@@ -2249,15 +2250,16 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio
22492250

22502251

22512252
@pytest.mark.integration
2252-
def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
2253+
def test_nanosecond_support_on_catalog(session_catalog: Catalog, arrow_table_schema_with_all_timestamp_precisions: pa.Schema) -> None:
22532254
identifier = "default.test_nanosecond_support_on_catalog"
2254-
# Create a pyarrow table with a nanosecond timestamp column
2255-
table = pa.Table.from_arrays(
2256-
[
2257-
pa.array([datetime.now()], type=pa.timestamp("ns")),
2258-
pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")),
2259-
],
2260-
names=["timestamp_ns", "timestamptz_ns"],
2261-
)
22622255

2263-
_create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema)
2256+
catalog = load_catalog("default", type="in-memory")
2257+
catalog.create_namespace("ns")
2258+
2259+
_create_table(session_catalog, identifier, {"format-version": "3"}, schema=arrow_table_schema_with_all_timestamp_precisions)
2260+
2261+
with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"):
2262+
catalog.create_table("ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"})
2263+
2264+
with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Column 'timestamp_ns' has an unsupported type: timestamp[ns]")):
2265+
_create_table(session_catalog, identifier, {"format-version": "2"}, schema=arrow_table_schema_with_all_timestamp_precisions)

0 commit comments

Comments
 (0)