Skip to content

Add the rest of the _convert_schema_if_needed calls #2300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -181,7 +181,10 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
schema: Schema = self._convert_schema_if_needed( # type: ignore
schema,
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
)

database_name, table_name = self.identifier_to_database_and_table(identifier)

Expand Down
7 changes: 5 additions & 2 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -200,7 +200,10 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
schema: Schema = self._convert_schema_if_needed( # type: ignore
schema,
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
)

namespace_identifier = Catalog.namespace_from(identifier)
table_name = Catalog.table_name_from(identifier)
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,7 @@ def _task_to_record_batches(
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
partition_spec: Optional[PartitionSpec] = None,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> Iterator[pa.RecordBatch]:
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
with io.new_input(task.file.file_path).open() as fin:
Expand All @@ -1462,7 +1463,9 @@ def _task_to_record_batches(
# Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
# When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on
# the table format version.
file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True)
file_schema = pyarrow_to_schema(
physical_schema, name_mapping, downcast_ns_timestamp_to_us=True, format_version=format_version
)

# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
projected_missing_fields = _get_column_projection_values(
Expand Down Expand Up @@ -1691,6 +1694,7 @@ def _record_batches_from_scan_tasks_and_deletes(
self._case_sensitive,
self._table_metadata.name_mapping(),
self._table_metadata.specs().get(task.file.spec_id),
self._table_metadata.format_version,
)
for batch in batches:
if self._limit is not None:
Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/table/update/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import TableProperties
from pyiceberg.table.name_mapping import (
NameMapping,
update_mapping,
Expand All @@ -48,7 +49,7 @@
UpdatesAndRequirements,
UpdateTableMetadata,
)
from pyiceberg.typedef import L
from pyiceberg.typedef import L, TableVersion
from pyiceberg.types import IcebergType, ListType, MapType, NestedField, PrimitiveType, StructType

if TYPE_CHECKING:
Expand Down Expand Up @@ -142,11 +143,13 @@ def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
self._case_sensitive = case_sensitive
return self

def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
def union_by_name(
self, new_schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
) -> UpdateSchema:
from pyiceberg.catalog import Catalog

visit_with_partner(
Catalog._convert_schema_if_needed(new_schema),
Catalog._convert_schema_if_needed(new_schema, format_version=format_version),
-1,
_UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
Expand Down
22 changes: 0 additions & 22 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2811,28 +2811,6 @@ def arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
)


@pytest.fixture(scope="session")
def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
"""Pyarrow Schema with all microseconds timestamp."""
import pyarrow as pa

return pa.schema(
[
("timestamp_s", pa.timestamp(unit="us")),
("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ms", pa.timestamp(unit="us")),
("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
("timestamp_us", pa.timestamp(unit="us")),
("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
("timestamp_ns", pa.timestamp(unit="us")),
("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
]
)


@pytest.fixture(scope="session")
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
"""Iceberg table Schema with only date, timestamp and timestamptz values."""
Expand Down
20 changes: 13 additions & 7 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2251,13 +2251,19 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio
@pytest.mark.integration
def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
identifier = "default.test_nanosecond_support_on_catalog"
# Create a pyarrow table with a nanosecond timestamp column
table = pa.Table.from_arrays(
[
pa.array([datetime.now()], type=pa.timestamp("ns")),
pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")),
],
names=["timestamp_ns", "timestamptz_ns"],

catalog = load_catalog("default", type="in-memory")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit can we use arrow_table_schema_with_nanoseconds_timestamp_precisions here? its pretty comprehensive

catalog.create_namespace("ns")

table = pa.Table.from_arrays([pa.array([datetime.now()], type=pa.timestamp("ns"))], names=["timestamps"])
table2 = pa.Table.from_arrays(
[pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York"))], names=["timestamps"]
)

_create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema)

with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"):
catalog.create_table("ns.table1", schema=table.schema, properties={"format-version": "3"})

with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"):
catalog.create_table("ns.table2", schema=table2.schema, properties={"format-version": "3"})
Comment on lines +2265 to +2269
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit can we test that the same code that works for v3 doesnt work for v2. something like

    with pytest.raises(NotImplementedError, match="..."):
        _create_table(session_catalog, identifier, {"format-version": "2"}, schema=table.schema)

Loading