Skip to content

Commit 24b12dd

Browse files
rambleraptorFokko
andauthored
Support reading nanoseconds from PyArrow (#2294)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> Closes #2270 Related to #1045 # Rationale for this change This allows us to read nanosecond information from pyarrow. Right now, we always downcast to microseconds or throw an error. By passing through the format-version, we can grab nanosecond precision *just for v3 tables* # Are these changes tested? Included a test. I can't do a test involving writing since we don't support v3 writing yet (there's a PR out for that) # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Fokko Driesprong <[email protected]>
1 parent dab2536 commit 24b12dd

File tree

6 files changed

+119
-21
lines changed

6 files changed

+119
-21
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
Identifier,
7171
Properties,
7272
RecursiveDict,
73+
TableVersion,
7374
)
7475
from pyiceberg.utils.config import Config, merge_config
7576
from pyiceberg.utils.properties import property_as_bool
@@ -743,7 +744,9 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[
743744
return load_file_io({**self.properties, **properties}, location)
744745

745746
@staticmethod
746-
def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
747+
def _convert_schema_if_needed(
748+
schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
749+
) -> Schema:
747750
if isinstance(schema, Schema):
748751
return schema
749752
try:
@@ -754,7 +757,10 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
754757
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
755758
if isinstance(schema, pa.Schema):
756759
schema: Schema = visit_pyarrow( # type: ignore
757-
schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
760+
schema,
761+
_ConvertToIcebergWithoutIDs(
762+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
763+
),
758764
)
759765
return schema
760766
except ModuleNotFoundError:
@@ -847,7 +853,10 @@ def _create_staged_table(
847853
Returns:
848854
StagedTable: the created staged table instance.
849855
"""
850-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
856+
schema: Schema = self._convert_schema_if_needed( # type: ignore
857+
schema,
858+
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
859+
)
851860

852861
database_name, table_name = self.identifier_to_database_and_table(identifier)
853862

pyiceberg/catalog/rest/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
StagedTable,
6565
Table,
6666
TableIdentifier,
67+
TableProperties,
6768
)
6869
from pyiceberg.table.metadata import TableMetadata
6970
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
@@ -498,7 +499,10 @@ def _create_table(
498499
properties: Properties = EMPTY_DICT,
499500
stage_create: bool = False,
500501
) -> TableResponse:
501-
iceberg_schema = self._convert_schema_if_needed(schema)
502+
iceberg_schema = self._convert_schema_if_needed(
503+
schema,
504+
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
505+
)
502506
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
503507
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
504508
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

pyiceberg/io/pyarrow.py

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,13 @@
145145
visit,
146146
visit_with_partner,
147147
)
148+
from pyiceberg.table import TableProperties
148149
from pyiceberg.table.locations import load_location_provider
149150
from pyiceberg.table.metadata import TableMetadata
150151
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
151152
from pyiceberg.table.puffin import PuffinFile
152153
from pyiceberg.transforms import IdentityTransform, TruncateTransform
153-
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
154+
from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
154155
from pyiceberg.types import (
155156
BinaryType,
156157
BooleanType,
@@ -1017,22 +1018,36 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start
10171018

10181019

10191020
def pyarrow_to_schema(
1020-
schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False
1021+
schema: pa.Schema,
1022+
name_mapping: Optional[NameMapping] = None,
1023+
downcast_ns_timestamp_to_us: bool = False,
1024+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
10211025
) -> Schema:
10221026
has_ids = visit_pyarrow(schema, _HasIds())
10231027
if has_ids:
1024-
return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
1028+
return visit_pyarrow(
1029+
schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version)
1030+
)
10251031
elif name_mapping is not None:
1026-
schema_without_ids = _pyarrow_to_schema_without_ids(schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
1032+
schema_without_ids = _pyarrow_to_schema_without_ids(
1033+
schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
1034+
)
10271035
return apply_name_mapping(schema_without_ids, name_mapping)
10281036
else:
10291037
raise ValueError(
10301038
"Parquet file does not have field-ids and the Iceberg table does not have 'schema.name-mapping.default' defined"
10311039
)
10321040

10331041

1034-
def _pyarrow_to_schema_without_ids(schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False) -> Schema:
1035-
return visit_pyarrow(schema, _ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
1042+
def _pyarrow_to_schema_without_ids(
1043+
schema: pa.Schema,
1044+
downcast_ns_timestamp_to_us: bool = False,
1045+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
1046+
) -> Schema:
1047+
return visit_pyarrow(
1048+
schema,
1049+
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version),
1050+
)
10361051

10371052

10381053
def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema:
@@ -1214,9 +1229,12 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
12141229

12151230
_field_names: List[str]
12161231

1217-
def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None:
1232+
def __init__(
1233+
self, downcast_ns_timestamp_to_us: bool = False, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
1234+
) -> None: # noqa: F821
12181235
self._field_names = []
12191236
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1237+
self._format_version = format_version
12201238

12211239
def _field_id(self, field: pa.Field) -> int:
12221240
if (field_id := _get_field_id(field)) is not None:
@@ -1287,6 +1305,11 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
12871305
elif primitive.unit == "ns":
12881306
if self._downcast_ns_timestamp_to_us:
12891307
logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
1308+
elif self._format_version >= 3:
1309+
if primitive.tz in UTC_ALIASES:
1310+
return TimestamptzNanoType()
1311+
else:
1312+
return TimestampNanoType()
12901313
else:
12911314
raise TypeError(
12921315
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.",
@@ -2519,7 +2542,10 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[
25192542

25202543

25212544
def _check_pyarrow_schema_compatible(
2522-
requested_schema: Schema, provided_schema: pa.Schema, downcast_ns_timestamp_to_us: bool = False
2545+
requested_schema: Schema,
2546+
provided_schema: pa.Schema,
2547+
downcast_ns_timestamp_to_us: bool = False,
2548+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
25232549
) -> None:
25242550
"""
25252551
Check if the `requested_schema` is compatible with `provided_schema`.
@@ -2532,10 +2558,15 @@ def _check_pyarrow_schema_compatible(
25322558
name_mapping = requested_schema.name_mapping
25332559
try:
25342560
provided_schema = pyarrow_to_schema(
2535-
provided_schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
2561+
provided_schema,
2562+
name_mapping=name_mapping,
2563+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
2564+
format_version=format_version,
25362565
)
25372566
except ValueError as e:
2538-
provided_schema = _pyarrow_to_schema_without_ids(provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
2567+
provided_schema = _pyarrow_to_schema_without_ids(
2568+
provided_schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, format_version=format_version
2569+
)
25392570
additional_names = set(provided_schema._name_to_id.keys()) - set(requested_schema._name_to_id.keys())
25402571
raise ValueError(
25412572
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
@@ -2561,7 +2592,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
25612592
)
25622593

25632594
schema = table_metadata.schema()
2564-
_check_pyarrow_schema_compatible(schema, arrow_schema)
2595+
_check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version)
25652596

25662597
statistics = data_file_statistics_from_parquet_metadata(
25672598
parquet_metadata=parquet_metadata,
@@ -2652,7 +2683,12 @@ def _dataframe_to_data_files(
26522683
)
26532684
name_mapping = table_metadata.schema().name_mapping
26542685
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
2655-
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
2686+
task_schema = pyarrow_to_schema(
2687+
df.schema,
2688+
name_mapping=name_mapping,
2689+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
2690+
format_version=table_metadata.format_version,
2691+
)
26562692

26572693
if table_metadata.spec().is_unpartitioned():
26582694
yield from write_file(

pyiceberg/table/__init__.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ class TableProperties:
219219

220220
DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
221221
FORMAT_VERSION = "format-version"
222-
DEFAULT_FORMAT_VERSION = 2
222+
DEFAULT_FORMAT_VERSION: TableVersion = 2
223223

224224
MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"
225225
MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB
@@ -477,7 +477,10 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
477477
)
478478
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
479479
_check_pyarrow_schema_compatible(
480-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
480+
self.table_metadata.schema(),
481+
provided_schema=df.schema,
482+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
483+
format_version=self.table_metadata.format_version,
481484
)
482485

483486
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
@@ -527,7 +530,10 @@ def dynamic_partition_overwrite(
527530

528531
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
529532
_check_pyarrow_schema_compatible(
530-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
533+
self.table_metadata.schema(),
534+
provided_schema=df.schema,
535+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
536+
format_version=self.table_metadata.format_version,
531537
)
532538

533539
# If dataframe does not have data, there is no need to overwrite
@@ -593,7 +599,10 @@ def overwrite(
593599
)
594600
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
595601
_check_pyarrow_schema_compatible(
596-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
602+
self.table_metadata.schema(),
603+
provided_schema=df.schema,
604+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
605+
format_version=self.table_metadata.format_version,
597606
)
598607

599608
if overwrite_filter != AlwaysFalse():
@@ -789,7 +798,10 @@ def upsert(
789798

790799
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
791800
_check_pyarrow_schema_compatible(
792-
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
801+
self.table_metadata.schema(),
802+
provided_schema=df.schema,
803+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
804+
format_version=self.table_metadata.format_version,
793805
)
794806

795807
# get list of rows that exist so we don't have to load the entire target table

tests/conftest.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2811,6 +2811,28 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
28112811
)
28122812

28132813

2814+
@pytest.fixture(scope="session")
2815+
def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
2816+
"""Pyarrow Schema with all microseconds timestamp."""
2817+
import pyarrow as pa
2818+
2819+
return pa.schema(
2820+
[
2821+
("timestamp_s", pa.timestamp(unit="us")),
2822+
("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
2823+
("timestamp_ms", pa.timestamp(unit="us")),
2824+
("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
2825+
("timestamp_us", pa.timestamp(unit="us")),
2826+
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
2827+
("timestamp_ns", pa.timestamp(unit="us")),
2828+
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
2829+
("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
2830+
("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
2831+
("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
2832+
]
2833+
)
2834+
2835+
28142836
@pytest.fixture(scope="session")
28152837
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
28162838
"""Iceberg table Schema with only date, timestamp and timestamptz values."""

tests/integration/test_writes/test_writes.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2246,3 +2246,18 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio
22462246
)
22472247
assert main_df.count() == 3
22482248
assert branch_df.count() == 2
2249+
2250+
2251+
@pytest.mark.integration
2252+
def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
2253+
identifier = "default.test_nanosecond_support_on_catalog"
2254+
# Create a pyarrow table with a nanosecond timestamp column
2255+
table = pa.Table.from_arrays(
2256+
[
2257+
pa.array([datetime.now()], type=pa.timestamp("ns")),
2258+
pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")),
2259+
],
2260+
names=["timestamp_ns", "timestamptz_ns"],
2261+
)
2262+
2263+
_create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema)

0 commit comments

Comments
 (0)