Skip to content

Commit edfadd9

Browse files
committed
Fix the requirement
1 parent cd19f80 commit edfadd9

File tree

2 files changed

+23
-12
lines changed

2 files changed

+23
-12
lines changed

pyiceberg/table/__init__.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ class TableProperties:
243243
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0
244244

245245
DELETE_MODE = "write.delete.mode"
246+
DELETE_MODE_COPY_ON_WRITE = "merge-on-read"
247+
DELETE_MODE_MERGE_ON_READ = "copy-on-write"
246248

247249
DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
248250
FORMAT_VERSION = "format-version"
@@ -301,7 +303,13 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ
301303
requirement.validate(self.table_metadata)
302304

303305
self._updates += updates
304-
self._requirements += requirements
306+
307+
# For the requirements, it does not make sense to add a requirement more than once
308+
# For example, you cannot assert that the current schema has two different IDs
309+
existing_requirements = {type(requirement) for requirement in self._requirements}
310+
for new_requirement in requirements:
311+
if type(new_requirement) not in existing_requirements:
312+
self._requirements = self._requirements + requirements
305313

306314
self.table_metadata = update_table_metadata(self.table_metadata, updates)
307315

@@ -461,8 +469,8 @@ def overwrite(
461469
update_snapshot.append_data_file(data_file)
462470

463471
def delete(self, delete_filter: BooleanExpression, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
464-
if (mode := self.table_metadata.properties.get(TableProperties.DELETE_MODE)) and mode != 'copy-on-write':
465-
warnings.warn("PyIceberg only supports copy on write")
472+
if self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_COPY_ON_WRITE) == TableProperties.DELETE_MODE_MERGE_ON_READ:
473+
raise NotImplementedError("Merge on read is not yet supported")
466474

467475
with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot:
468476
delete_snapshot.delete_by_predicate(delete_filter)
@@ -2948,7 +2956,7 @@ def _commit(self) -> UpdatesAndRequirements:
29482956
),
29492957
(
29502958
AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid),
2951-
# AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, ref="main"),
2959+
AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),
29522960
),
29532961
)
29542962

tests/integration/test_rest_schema.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2512,15 +2512,18 @@ def test_two_add_schemas_in_a_single_transaction(catalog: Catalog) -> None:
25122512
),
25132513
)
25142514

2515-
with pytest.raises(CommitFailedException) as exc_info:
2516-
with tbl.transaction() as tr:
2517-
with tr.update_schema() as update:
2518-
update.add_column("bar", field_type=StringType())
2519-
with tr.update_schema() as update:
2520-
update.add_column("baz", field_type=StringType())
2521-
2522-
assert "CommitFailedException: Requirement failed: current schema changed: expected id 1 != 0" in str(exc_info.value)
2515+
with tbl.transaction() as tr:
2516+
with tr.update_schema() as update:
2517+
update.add_column("bar", field_type=StringType())
2518+
with tr.update_schema() as update:
2519+
update.add_column("baz", field_type=StringType())
25232520

2521+
assert tbl.schema().schema_id == 2
2522+
assert tbl.schema() == Schema(
2523+
NestedField(field_id=1, name="foo", field_type=StringType()),
2524+
NestedField(field_id=2, name="bar", field_type=StringType()),
2525+
NestedField(field_id=3, name="baz", field_type=StringType()),
2526+
)
25242527

25252528
@pytest.mark.integration
25262529
def test_create_table_integrity_after_fresh_assignment(catalog: Catalog) -> None:

0 commit comments

Comments
 (0)