Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -181,7 +181,10 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
schema: Schema = self._convert_schema_if_needed( # type: ignore
schema,
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
)

database_name, table_name = self.identifier_to_database_and_table(identifier)

Expand Down
7 changes: 5 additions & 2 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -200,7 +200,10 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
schema: Schema = self._convert_schema_if_needed( # type: ignore
schema,
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
)

namespace_identifier = Catalog.namespace_from(identifier)
table_name = Catalog.table_name_from(identifier)
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,7 @@ def _task_to_record_batches(
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
partition_spec: Optional[PartitionSpec] = None,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> Iterator[pa.RecordBatch]:
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
with io.new_input(task.file.file_path).open() as fin:
Expand All @@ -1492,7 +1493,9 @@ def _task_to_record_batches(
# Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
# When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on
# the table format version.
file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True)
file_schema = pyarrow_to_schema(
physical_schema, name_mapping, downcast_ns_timestamp_to_us=True, format_version=format_version
)

# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
projected_missing_fields = _get_column_projection_values(
Expand Down Expand Up @@ -1721,6 +1724,7 @@ def _record_batches_from_scan_tasks_and_deletes(
self._case_sensitive,
self._table_metadata.name_mapping(),
self._table_metadata.specs().get(task.file.spec_id),
self._table_metadata.format_version,
)
for batch in batches:
if self._limit is not None:
Expand Down
11 changes: 8 additions & 3 deletions pyiceberg/table/update/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
UpdatesAndRequirements,
UpdateTableMetadata,
)
from pyiceberg.typedef import L
from pyiceberg.typedef import L, TableVersion
from pyiceberg.types import IcebergType, ListType, MapType, NestedField, PrimitiveType, StructType

if TYPE_CHECKING:
Expand Down Expand Up @@ -142,11 +142,16 @@ def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
self._case_sensitive = case_sensitive
return self

def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
def union_by_name(
# TODO: Move TableProperties.DEFAULT_FORMAT_VERSION to separate file and set that as format_version default.
self,
new_schema: Union[Schema, "pa.Schema"],
format_version: TableVersion = 2,
) -> UpdateSchema:
from pyiceberg.catalog import Catalog

visit_with_partner(
Catalog._convert_schema_if_needed(new_schema),
Catalog._convert_schema_if_needed(new_schema, format_version=format_version),
-1,
_UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
Expand Down
22 changes: 0 additions & 22 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2811,28 +2811,6 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
)


@pytest.fixture(scope="session")
def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
"""Pyarrow Schema with all microseconds timestamp."""
import pyarrow as pa

return pa.schema(
[
("timestamp_s", pa.timestamp(unit="us")),
("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ms", pa.timestamp(unit="us")),
("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
("timestamp_us", pa.timestamp(unit="us")),
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ns", pa.timestamp(unit="us")),
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
]
)


@pytest.fixture(scope="session")
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
"""Iceberg table Schema with only date, timestamp and timestamptz values."""
Expand Down
32 changes: 21 additions & 11 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import math
import os
import random
import re
import time
import uuid
from datetime import date, datetime, timedelta
Expand All @@ -44,7 +45,7 @@
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError
from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, LessThan, Not
from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
Expand Down Expand Up @@ -2249,15 +2250,24 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio


@pytest.mark.integration
def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
def test_nanosecond_support_on_catalog(
session_catalog: Catalog, arrow_table_schema_with_all_timestamp_precisions: pa.Schema
) -> None:
identifier = "default.test_nanosecond_support_on_catalog"
# Create a pyarrow table with a nanosecond timestamp column
table = pa.Table.from_arrays(
[
pa.array([datetime.now()], type=pa.timestamp("ns")),
pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")),
],
names=["timestamp_ns", "timestamptz_ns"],
)

_create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema)
catalog = load_catalog("default", type="in-memory")
catalog.create_namespace("ns")

_create_table(session_catalog, identifier, {"format-version": "3"}, schema=arrow_table_schema_with_all_timestamp_precisions)

with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"):
catalog.create_table(
"ns.table1", schema=arrow_table_schema_with_all_timestamp_precisions, properties={"format-version": "3"}
)

with pytest.raises(
UnsupportedPyArrowTypeException, match=re.escape("Column 'timestamp_ns' has an unsupported type: timestamp[ns]")
):
_create_table(
session_catalog, identifier, {"format-version": "2"}, schema=arrow_table_schema_with_all_timestamp_precisions
)