Skip to content

Commit bf7a5ef

Browse files
committed
Add the rest of the _convert_schema_if_needed calls
1 parent 24b12dd commit bf7a5ef

File tree

5 files changed

+34
-15
lines changed

5 files changed

+34
-15
lines changed

pyiceberg/catalog/dynamodb.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
5454
from pyiceberg.schema import Schema
5555
from pyiceberg.serializers import FromInputFile
56-
from pyiceberg.table import CommitTableResponse, Table
56+
from pyiceberg.table import CommitTableResponse, Table, TableProperties
5757
from pyiceberg.table.locations import load_location_provider
5858
from pyiceberg.table.metadata import new_table_metadata
5959
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -181,7 +181,10 @@ def create_table(
181181
ValueError: If the identifier is invalid, or no path is given to store metadata.
182182
183183
"""
184-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
184+
schema: Schema = self._convert_schema_if_needed( # type: ignore
185+
schema,
186+
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
187+
)
185188

186189
database_name, table_name = self.identifier_to_database_and_table(identifier)
187190

pyiceberg/catalog/sql.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6363
from pyiceberg.schema import Schema
6464
from pyiceberg.serializers import FromInputFile
65-
from pyiceberg.table import CommitTableResponse, Table
65+
from pyiceberg.table import CommitTableResponse, Table, TableProperties
6666
from pyiceberg.table.locations import load_location_provider
6767
from pyiceberg.table.metadata import new_table_metadata
6868
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -200,7 +200,10 @@ def create_table(
200200
ValueError: If the identifier is invalid, or no path is given to store metadata.
201201
202202
"""
203-
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
203+
schema: Schema = self._convert_schema_if_needed( # type: ignore
204+
schema,
205+
int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
206+
)
204207

205208
namespace_identifier = Catalog.namespace_from(identifier)
206209
table_name = Catalog.table_name_from(identifier)

pyiceberg/io/pyarrow.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1453,6 +1453,7 @@ def _task_to_record_batches(
14531453
case_sensitive: bool,
14541454
name_mapping: Optional[NameMapping] = None,
14551455
partition_spec: Optional[PartitionSpec] = None,
1456+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
14561457
) -> Iterator[pa.RecordBatch]:
14571458
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
14581459
with io.new_input(task.file.file_path).open() as fin:
@@ -1462,7 +1463,9 @@ def _task_to_record_batches(
14621463
# Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
14631464
# When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on
14641465
# the table format version.
1465-
file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True)
1466+
file_schema = pyarrow_to_schema(
1467+
physical_schema, name_mapping, downcast_ns_timestamp_to_us=True, format_version=format_version
1468+
)
14661469

14671470
# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
14681471
projected_missing_fields = _get_column_projection_values(
@@ -1691,6 +1694,7 @@ def _record_batches_from_scan_tasks_and_deletes(
16911694
self._case_sensitive,
16921695
self._table_metadata.name_mapping(),
16931696
self._table_metadata.specs().get(task.file.spec_id),
1697+
self._table_metadata.format_version,
16941698
)
16951699
for batch in batches:
16961700
if self._limit is not None:

pyiceberg/table/update/schema.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
visit,
3535
visit_with_partner,
3636
)
37+
from pyiceberg.table import TableProperties
3738
from pyiceberg.table.name_mapping import (
3839
NameMapping,
3940
update_mapping,
@@ -48,7 +49,7 @@
4849
UpdatesAndRequirements,
4950
UpdateTableMetadata,
5051
)
51-
from pyiceberg.typedef import L
52+
from pyiceberg.typedef import L, TableVersion
5253
from pyiceberg.types import IcebergType, ListType, MapType, NestedField, PrimitiveType, StructType
5354

5455
if TYPE_CHECKING:
@@ -142,11 +143,13 @@ def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
142143
self._case_sensitive = case_sensitive
143144
return self
144145

145-
def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
146+
def union_by_name(
147+
self, new_schema: Union[Schema, "pa.Schema"], format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
148+
) -> UpdateSchema:
146149
from pyiceberg.catalog import Catalog
147150

148151
visit_with_partner(
149-
Catalog._convert_schema_if_needed(new_schema),
152+
Catalog._convert_schema_if_needed(new_schema, format_version=format_version),
150153
-1,
151154
_UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
152155
# type: ignore

tests/integration/test_writes/test_writes.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2251,13 +2251,19 @@ def test_branch_py_write_spark_read(session_catalog: Catalog, spark: SparkSessio
22512251
@pytest.mark.integration
22522252
def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
22532253
identifier = "default.test_nanosecond_support_on_catalog"
2254-
# Create a pyarrow table with a nanosecond timestamp column
2255-
table = pa.Table.from_arrays(
2256-
[
2257-
pa.array([datetime.now()], type=pa.timestamp("ns")),
2258-
pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York")),
2259-
],
2260-
names=["timestamp_ns", "timestamptz_ns"],
2254+
2255+
catalog = load_catalog("default", type="in-memory")
2256+
catalog.create_namespace("ns")
2257+
2258+
table = pa.Table.from_arrays([pa.array([datetime.now()], type=pa.timestamp("ns"))], names=["timestamps"])
2259+
table2 = pa.Table.from_arrays(
2260+
[pa.array([datetime.now()], type=pa.timestamp("ns", tz="America/New_York"))], names=["timestamps"]
22612261
)
22622262

22632263
_create_table(session_catalog, identifier, {"format-version": "3"}, schema=table.schema)
2264+
2265+
with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"):
2266+
catalog.create_table("ns.table1", schema=table.schema, properties={"format-version": "3"})
2267+
2268+
with pytest.raises(NotImplementedError, match="Writing V3 is not yet supported"):
2269+
catalog.create_table("ns.table2", schema=table2.schema, properties={"format-version": "3"})

0 commit comments

Comments
 (0)