Skip to content

Commit 884eca9

Browse files
author
Yingjian Wu
committed
wip
wip
1 parent 7181ae1 commit 884eca9

File tree

4 files changed

+38
-52
lines changed

4 files changed

+38
-52
lines changed

pyiceberg/table/__init__.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,9 @@ def _build_partition_predicate(self, partition_records: Set[Record]) -> BooleanE
397397
expr = Or(expr, match_partition_expression)
398398
return expr
399399

400-
def _append_snapshot_producer(self, snapshot_properties: Dict[str, str], branch: Optional[str]) -> _FastAppendFiles:
400+
def _append_snapshot_producer(
401+
self, snapshot_properties: Dict[str, str], branch: Optional[str] = MAIN_BRANCH
402+
) -> _FastAppendFiles:
401403
"""Determine the append type based on table properties.
402404
403405
Args:
@@ -431,19 +433,14 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
431433
)
432434

433435
def update_snapshot(
434-
self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None, stage_only: bool = False
436+
self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
435437
) -> UpdateSnapshot:
436438
"""Create a new UpdateSnapshot to produce a new snapshot for the table.
437439
438440
Returns:
439441
A new UpdateSnapshot
440442
"""
441-
if branch is None:
442-
branch = MAIN_BRANCH
443-
444-
return UpdateSnapshot(
445-
self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties, stage_only=stage_only
446-
)
443+
return UpdateSnapshot(self, io=self._table.io, branch=branch, snapshot_properties=snapshot_properties)
447444

448445
def update_statistics(self) -> UpdateStatistics:
449446
"""
@@ -454,7 +451,7 @@ def update_statistics(self) -> UpdateStatistics:
454451
"""
455452
return UpdateStatistics(transaction=self)
456453

457-
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None) -> None:
454+
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH) -> None:
458455
"""
459456
Shorthand API for appending a PyArrow table to a table transaction.
460457
@@ -499,7 +496,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
499496
append_files.append_data_file(data_file)
500497

501498
def dynamic_partition_overwrite(
502-
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None
499+
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
503500
) -> None:
504501
"""
505502
Shorthand for overwriting existing partitions with a PyArrow table.
@@ -566,7 +563,7 @@ def overwrite(
566563
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
567564
snapshot_properties: Dict[str, str] = EMPTY_DICT,
568565
case_sensitive: bool = True,
569-
branch: Optional[str] = None,
566+
branch: Optional[str] = MAIN_BRANCH,
570567
) -> None:
571568
"""
572569
Shorthand for adding a table overwrite with a PyArrow table to the transaction.
@@ -632,7 +629,7 @@ def delete(
632629
delete_filter: Union[str, BooleanExpression],
633630
snapshot_properties: Dict[str, str] = EMPTY_DICT,
634631
case_sensitive: bool = True,
635-
branch: Optional[str] = None,
632+
branch: Optional[str] = MAIN_BRANCH,
636633
) -> None:
637634
"""
638635
Shorthand for deleting record from a table.
@@ -735,7 +732,7 @@ def upsert(
735732
when_matched_update_all: bool = True,
736733
when_not_matched_insert_all: bool = True,
737734
case_sensitive: bool = True,
738-
branch: Optional[str] = None,
735+
branch: Optional[str] = MAIN_BRANCH,
739736
) -> UpsertResult:
740737
"""Shorthand API for performing an upsert to an iceberg table.
741738
@@ -820,7 +817,7 @@ def upsert(
820817
case_sensitive=case_sensitive,
821818
)
822819

823-
if branch is not None:
820+
if branch in self.table_metadata.refs:
824821
matched_iceberg_record_batches_scan = matched_iceberg_record_batches_scan.use_ref(branch)
825822

826823
matched_iceberg_record_batches = matched_iceberg_record_batches_scan.to_arrow_batch_reader()
@@ -1311,7 +1308,7 @@ def upsert(
13111308
when_matched_update_all: bool = True,
13121309
when_not_matched_insert_all: bool = True,
13131310
case_sensitive: bool = True,
1314-
branch: Optional[str] = None,
1311+
branch: Optional[str] = MAIN_BRANCH,
13151312
) -> UpsertResult:
13161313
"""Shorthand API for performing an upsert to an iceberg table.
13171314
@@ -1358,7 +1355,7 @@ def upsert(
13581355
branch=branch,
13591356
)
13601357

1361-
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None) -> None:
1358+
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH) -> None:
13621359
"""
13631360
Shorthand API for appending a PyArrow table to the table.
13641361
@@ -1371,7 +1368,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
13711368
tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)
13721369

13731370
def dynamic_partition_overwrite(
1374-
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = None
1371+
self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH
13751372
) -> None:
13761373
"""Shorthand for dynamic overwriting the table with a PyArrow table.
13771374
@@ -1390,7 +1387,7 @@ def overwrite(
13901387
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
13911388
snapshot_properties: Dict[str, str] = EMPTY_DICT,
13921389
case_sensitive: bool = True,
1393-
branch: Optional[str] = None,
1390+
branch: Optional[str] = MAIN_BRANCH,
13941391
) -> None:
13951392
"""
13961393
Shorthand for overwriting the table with a PyArrow table.
@@ -1423,7 +1420,7 @@ def delete(
14231420
delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
14241421
snapshot_properties: Dict[str, str] = EMPTY_DICT,
14251422
case_sensitive: bool = True,
1426-
branch: Optional[str] = None,
1423+
branch: Optional[str] = MAIN_BRANCH,
14271424
) -> None:
14281425
"""
14291426
Shorthand for deleting rows from the table.

pyiceberg/table/metadata.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,10 @@ def new_snapshot_id(self) -> int:
295295

296296
return snapshot_id
297297

298-
def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
298+
def snapshot_by_name(self, name: Optional[str]) -> Optional[Snapshot]:
299299
"""Return the snapshot referenced by the given name or null if no such reference exists."""
300+
if name is None:
301+
name = MAIN_BRANCH
300302
if ref := self.refs.get(name):
301303
return self.snapshot_by_id(ref.snapshot_id)
302304
return None

pyiceberg/table/update/snapshot.py

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
108108
_manifest_num_counter: itertools.count[int]
109109
_deleted_data_files: Set[DataFile]
110110
_compression: AvroCompressionCodec
111-
_target_branch = MAIN_BRANCH
112-
_stage_only = False
111+
_target_branch: Optional[str]
113112

114113
def __init__(
115114
self,
@@ -118,8 +117,7 @@ def __init__(
118117
io: FileIO,
119118
commit_uuid: Optional[uuid.UUID] = None,
120119
snapshot_properties: Dict[str, str] = EMPTY_DICT,
121-
branch: str = MAIN_BRANCH,
122-
stage_only: bool = False,
120+
branch: Optional[str] = MAIN_BRANCH,
123121
) -> None:
124122
super().__init__(transaction)
125123
self.commit_uuid = commit_uuid or uuid.uuid4()
@@ -139,16 +137,14 @@ def __init__(
139137
self._parent_snapshot_id = (
140138
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._target_branch)) else None
141139
)
142-
self._stage_only = stage_only
143140

144-
def _validate_target_branch(self, branch: str) -> str:
141+
def _validate_target_branch(self, branch: Optional[str]) -> Optional[str]:
145142
# Default is already set to MAIN_BRANCH. So branch name can't be None.
146-
if branch is None:
147-
raise ValueError("Invalid branch name: null")
148-
if branch in self._transaction.table_metadata.refs:
149-
ref = self._transaction.table_metadata.refs[branch]
150-
if ref.snapshot_ref_type != SnapshotRefType.BRANCH:
151-
raise ValueError(f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots")
143+
if branch is not None:
144+
if branch in self._transaction.table_metadata.refs:
145+
ref = self._transaction.table_metadata.refs[branch]
146+
if ref.snapshot_ref_type != SnapshotRefType.BRANCH:
147+
raise ValueError(f"{branch} is a tag, not a branch. Tags cannot be targets for producing snapshots")
152148
return branch
153149

154150
def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
@@ -297,7 +293,7 @@ def _commit(self) -> UpdatesAndRequirements:
297293

298294
add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot)
299295

300-
if self._stage_only:
296+
if self._target_branch is None:
301297
return (
302298
(add_snapshot_update,),
303299
(),
@@ -368,12 +364,11 @@ def __init__(
368364
operation: Operation,
369365
transaction: Transaction,
370366
io: FileIO,
371-
branch: str,
367+
branch: Optional[str] = MAIN_BRANCH,
372368
commit_uuid: Optional[uuid.UUID] = None,
373369
snapshot_properties: Dict[str, str] = EMPTY_DICT,
374-
stage_only: bool = False,
375370
):
376-
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch, stage_only)
371+
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch)
377372
self._predicate = AlwaysFalse()
378373
self._case_sensitive = True
379374

@@ -539,14 +534,13 @@ def __init__(
539534
operation: Operation,
540535
transaction: Transaction,
541536
io: FileIO,
542-
branch: str,
537+
branch: Optional[str] = MAIN_BRANCH,
543538
commit_uuid: Optional[uuid.UUID] = None,
544539
snapshot_properties: Dict[str, str] = EMPTY_DICT,
545-
stage_only: bool = False,
546540
) -> None:
547541
from pyiceberg.table import TableProperties
548542

549-
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch, stage_only)
543+
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch)
550544
self._target_size_bytes = property_as_int(
551545
self._transaction.table_metadata.properties,
552546
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
@@ -661,23 +655,20 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
661655
class UpdateSnapshot:
662656
_transaction: Transaction
663657
_io: FileIO
664-
_branch: str
665-
_stage_only: bool
658+
_branch: Optional[str]
666659
_snapshot_properties: Dict[str, str]
667660

668661
def __init__(
669662
self,
670663
transaction: Transaction,
671664
io: FileIO,
672-
branch: str,
673-
stage_only: bool = False,
665+
branch: Optional[str] = MAIN_BRANCH,
674666
snapshot_properties: Dict[str, str] = EMPTY_DICT,
675667
) -> None:
676668
self._transaction = transaction
677669
self._io = io
678670
self._snapshot_properties = snapshot_properties
679671
self._branch = branch
680-
self._stage_only = stage_only
681672

682673
def fast_append(self) -> _FastAppendFiles:
683674
return _FastAppendFiles(
@@ -686,7 +677,6 @@ def fast_append(self) -> _FastAppendFiles:
686677
io=self._io,
687678
branch=self._branch,
688679
snapshot_properties=self._snapshot_properties,
689-
stage_only=self._stage_only,
690680
)
691681

692682
def merge_append(self) -> _MergeAppendFiles:
@@ -696,7 +686,6 @@ def merge_append(self) -> _MergeAppendFiles:
696686
io=self._io,
697687
branch=self._branch,
698688
snapshot_properties=self._snapshot_properties,
699-
stage_only=self._stage_only,
700689
)
701690

702691
def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
@@ -709,7 +698,6 @@ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
709698
io=self._io,
710699
branch=self._branch,
711700
snapshot_properties=self._snapshot_properties,
712-
stage_only=self._stage_only,
713701
)
714702

715703
def delete(self) -> _DeleteFiles:
@@ -719,7 +707,6 @@ def delete(self) -> _DeleteFiles:
719707
io=self._io,
720708
branch=self._branch,
721709
snapshot_properties=self._snapshot_properties,
722-
stage_only=self._stage_only,
723710
)
724711

725712

tests/integration/test_writes/test_writes.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2287,7 +2287,7 @@ def test_stage_only_delete(
22872287
assert len(files_to_delete) > 0
22882288

22892289
with tbl.transaction() as txn:
2290-
with txn.update_snapshot(stage_only=True).delete() as delete:
2290+
with txn.update_snapshot(branch=None).delete() as delete:
22912291
delete.delete_by_predicate(EqualTo("int", 9))
22922292

22932293
# a new delete snapshot is added
@@ -2324,7 +2324,7 @@ def test_stage_only_fast_append(
23242324
assert original_count == 3
23252325

23262326
with tbl.transaction() as txn:
2327-
with txn.update_snapshot(stage_only=True).fast_append() as fast_append:
2327+
with txn.update_snapshot(branch=None).fast_append() as fast_append:
23282328
for data_file in _dataframe_to_data_files(
23292329
table_metadata=txn.table_metadata, df=arrow_table_with_null, io=txn._table.io
23302330
):
@@ -2364,7 +2364,7 @@ def test_stage_only_merge_append(
23642364
assert original_count == 3
23652365

23662366
with tbl.transaction() as txn:
2367-
with txn.update_snapshot(stage_only=True).merge_append() as merge_append:
2367+
with txn.update_snapshot(branch=None).merge_append() as merge_append:
23682368
for data_file in _dataframe_to_data_files(
23692369
table_metadata=txn.table_metadata, df=arrow_table_with_null, io=txn._table.io
23702370
):
@@ -2409,7 +2409,7 @@ def test_stage_only_overwrite_files(
24092409
assert len(files_to_delete) > 0
24102410

24112411
with tbl.transaction() as txn:
2412-
with txn.update_snapshot(stage_only=True).overwrite() as overwrite:
2412+
with txn.update_snapshot(branch=None).overwrite() as overwrite:
24132413
for data_file in _dataframe_to_data_files(
24142414
table_metadata=txn.table_metadata, df=arrow_table_with_null, io=txn._table.io
24152415
):

0 commit comments

Comments
 (0)