Skip to content

Commit 2acba74

Browse files
authored
Use self.table_metadata in transaction (#985)
1 parent 1e6ec0e commit 2acba74

File tree

3 files changed

+22
-20
lines changed

3 files changed

+22
-20
lines changed

pyiceberg/table/__init__.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,10 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
331331
if format_version not in {1, 2}:
332332
raise ValueError(f"Unsupported table format version: {format_version}")
333333

334-
if format_version < self._table.metadata.format_version:
335-
raise ValueError(f"Cannot downgrade v{self._table.metadata.format_version} table to v{format_version}")
334+
if format_version < self.table_metadata.format_version:
335+
raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")
336336

337-
if format_version > self._table.metadata.format_version:
337+
if format_version > self.table_metadata.format_version:
338338
return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))
339339

340340
return self
@@ -452,7 +452,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
452452
self,
453453
allow_incompatible_changes=allow_incompatible_changes,
454454
case_sensitive=case_sensitive,
455-
name_mapping=self._table.name_mapping(),
455+
name_mapping=self.table_metadata.name_mapping(),
456456
)
457457

458458
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
@@ -489,7 +489,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
489489
)
490490
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
491491
_check_pyarrow_schema_compatible(
492-
self._table.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
492+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
493493
)
494494

495495
manifest_merge_enabled = PropertyUtil.property_as_bool(
@@ -504,7 +504,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
504504
# skip writing data files if the dataframe is empty
505505
if df.shape[0] > 0:
506506
data_files = _dataframe_to_data_files(
507-
table_metadata=self._table.metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
507+
table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
508508
)
509509
for data_file in data_files:
510510
append_files.append_data_file(data_file)
@@ -548,7 +548,7 @@ def overwrite(
548548
)
549549
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
550550
_check_pyarrow_schema_compatible(
551-
self._table.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
551+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
552552
)
553553

554554
self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
@@ -557,7 +557,7 @@ def overwrite(
557557
# skip writing data files if the dataframe is empty
558558
if df.shape[0] > 0:
559559
data_files = _dataframe_to_data_files(
560-
table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
560+
table_metadata=self.table_metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
561561
)
562562
for data_file in data_files:
563563
update_snapshot.append_data_file(data_file)
@@ -595,7 +595,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
595595

596596
# Check if there are any files that require an actual rewrite of a data file
597597
if delete_snapshot.rewrites_needed is True:
598-
bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True)
598+
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True)
599599
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)
600600

601601
files = self._scan(row_filter=delete_filter).plan_files()
@@ -614,7 +614,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
614614
for original_file in files:
615615
df = project_table(
616616
tasks=[original_file],
617-
table_metadata=self._table.metadata,
617+
table_metadata=self.table_metadata,
618618
io=self._table.io,
619619
row_filter=AlwaysTrue(),
620620
projected_schema=self.table_metadata.schema(),
@@ -629,7 +629,7 @@ def delete(self, delete_filter: Union[str, BooleanExpression], snapshot_properti
629629
_dataframe_to_data_files(
630630
io=self._table.io,
631631
df=filtered_df,
632-
table_metadata=self._table.metadata,
632+
table_metadata=self.table_metadata,
633633
write_uuid=commit_uuid,
634634
counter=counter,
635635
)
@@ -658,11 +658,13 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] =
658658
Raises:
659659
FileNotFoundError: If the file does not exist.
660660
"""
661-
if self._table.name_mapping() is None:
662-
self.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self._table.schema().name_mapping.model_dump_json()})
661+
if self.table_metadata.name_mapping() is None:
662+
self.set_properties(**{
663+
TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json()
664+
})
663665
with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
664666
data_files = _parquet_files_to_data_files(
665-
table_metadata=self._table.metadata, file_paths=file_paths, io=self._table.io
667+
table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io
666668
)
667669
for data_file in data_files:
668670
update_snapshot.append_data_file(data_file)

tests/catalog/test_sql.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,9 +1421,9 @@ def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None:
14211421
with txn.update_schema() as schema_txn:
14221422
schema_txn.union_by_name(pa_table_with_column.schema)
14231423

1424-
with txn.update_snapshot().fast_append() as snapshot_update:
1425-
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table_with_column, io=tbl.io):
1426-
snapshot_update.append_data_file(data_file)
1424+
txn.append(pa_table_with_column)
1425+
txn.overwrite(pa_table_with_column)
1426+
txn.delete("foo = 'a'")
14271427

14281428

14291429
@pytest.mark.parametrize(

tests/integration/test_writes/test_writes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -718,9 +718,9 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None
718718
with txn.update_schema() as schema_txn:
719719
schema_txn.union_by_name(pa_table_with_column.schema)
720720

721-
with txn.update_snapshot().fast_append() as snapshot_update:
722-
for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table_with_column, io=tbl.io):
723-
snapshot_update.append_data_file(data_file)
721+
txn.append(pa_table_with_column)
722+
txn.overwrite(pa_table_with_column)
723+
txn.delete("foo = 'a'")
724724

725725

726726
@pytest.mark.integration

0 commit comments

Comments
 (0)