Skip to content

Commit 3a942cf

Browse files
author
Yingjian Wu
committed
add test
1 parent 884eca9 commit 3a942cf

File tree

2 files changed

+91
-24
lines changed

2 files changed

+91
-24
lines changed

pyiceberg/table/update/snapshot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def __init__(
139139
)
140140

141141
def _validate_target_branch(self, branch: Optional[str]) -> Optional[str]:
142-
# Default is already set to MAIN_BRANCH. So branch name can't be None.
142+
# if branch is none, write will be written into a staging snapshot
143143
if branch is not None:
144144
if branch in self._transaction.table_metadata.refs:
145145
ref = self._transaction.table_metadata.refs[branch]

tests/integration/test_writes/test_writes.py

Lines changed: 90 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2293,20 +2293,36 @@ def test_stage_only_delete(
22932293
# a new delete snapshot is added
22942294
snapshots = tbl.snapshots()
22952295
assert len(snapshots) == 2
2296+
# snapshot main ref has not changed
2297+
assert current_snapshot == tbl.metadata.current_snapshot_id
2298+
assert len(tbl.scan().to_arrow()) == original_count
2299+
2300+
# Write to main branch
2301+
with tbl.transaction() as txn:
2302+
with txn.update_snapshot().fast_append() as fast_append:
2303+
for data_file in _dataframe_to_data_files(
2304+
table_metadata=txn.table_metadata, df=arrow_table_with_null, io=txn._table.io
2305+
):
2306+
fast_append.append_data_file(data_file=data_file)
2307+
2308+
# Main ref has changed
2309+
assert current_snapshot != tbl.metadata.current_snapshot_id
2310+
assert len(tbl.scan().to_arrow()) == 3
2311+
snapshots = tbl.snapshots()
2312+
assert len(snapshots) == 3
22962313

22972314
rows = spark.sql(
22982315
f"""
2299-
SELECT operation, summary
2300-
FROM {identifier}.snapshots
2301-
ORDER BY committed_at ASC
2302-
"""
2316+
SELECT operation, parent_id
2317+
FROM {identifier}.snapshots
2318+
ORDER BY committed_at ASC
2319+
"""
23032320
).collect()
23042321
operations = [row.operation for row in rows]
2305-
assert operations == ["append", "delete"]
2306-
2307-
# snapshot main ref has not changed
2308-
assert current_snapshot == tbl.metadata.current_snapshot_id
2309-
assert len(tbl.scan().to_arrow()) == original_count
2322+
parent_snapshot_id = [row.parent_id for row in rows]
2323+
assert operations == ["append", "delete", "append"]
2324+
# both subsequent parent id should be the first snapshot id
2325+
assert parent_snapshot_id == [None, current_snapshot, current_snapshot]
23102326

23112327

23122328
@pytest.mark.integration
@@ -2323,6 +2339,7 @@ def test_stage_only_fast_append(
23232339
original_count = len(tbl.scan().to_arrow())
23242340
assert original_count == 3
23252341

2342+
# Write to staging branch
23262343
with tbl.transaction() as txn:
23272344
with txn.update_snapshot(branch=None).fast_append() as fast_append:
23282345
for data_file in _dataframe_to_data_files(
@@ -2333,20 +2350,37 @@ def test_stage_only_fast_append(
23332350
# Main ref has not changed and data is not yet appended
23342351
assert current_snapshot == tbl.metadata.current_snapshot_id
23352352
assert len(tbl.scan().to_arrow()) == original_count
2336-
23372353
# There should be a new staged snapshot
23382354
snapshots = tbl.snapshots()
23392355
assert len(snapshots) == 2
23402356

2357+
# Write to main branch
2358+
with tbl.transaction() as txn:
2359+
with txn.update_snapshot().fast_append() as fast_append:
2360+
for data_file in _dataframe_to_data_files(
2361+
table_metadata=txn.table_metadata, df=arrow_table_with_null, io=txn._table.io
2362+
):
2363+
fast_append.append_data_file(data_file=data_file)
2364+
2365+
# Main ref has changed
2366+
assert current_snapshot != tbl.metadata.current_snapshot_id
2367+
assert len(tbl.scan().to_arrow()) == 6
2368+
snapshots = tbl.snapshots()
2369+
assert len(snapshots) == 3
2370+
23412371
rows = spark.sql(
23422372
f"""
2343-
SELECT operation, summary
2373+
SELECT operation, parent_id
23442374
FROM {identifier}.snapshots
23452375
ORDER BY committed_at ASC
23462376
"""
23472377
).collect()
23482378
operations = [row.operation for row in rows]
2349-
assert operations == ["append", "append"]
2379+
parent_snapshot_id = [row.parent_id for row in rows]
2380+
assert operations == ["append", "append", "append"]
2381+
# both subsequent parent id should be the first snapshot id
2382+
assert parent_snapshot_id == [None, current_snapshot, current_snapshot]
2383+
23502384

23512385

23522386
@pytest.mark.integration
@@ -2378,15 +2412,32 @@ def test_stage_only_merge_append(
23782412
snapshots = tbl.snapshots()
23792413
assert len(snapshots) == 2
23802414

2415+
# Write to main branch
2416+
with tbl.transaction() as txn:
2417+
with txn.update_snapshot().fast_append() as fast_append:
2418+
for data_file in _dataframe_to_data_files(
2419+
table_metadata=txn.table_metadata, df=arrow_table_with_null, io=txn._table.io
2420+
):
2421+
fast_append.append_data_file(data_file=data_file)
2422+
2423+
# Main ref has changed
2424+
assert current_snapshot != tbl.metadata.current_snapshot_id
2425+
assert len(tbl.scan().to_arrow()) == 6
2426+
snapshots = tbl.snapshots()
2427+
assert len(snapshots) == 3
2428+
23812429
rows = spark.sql(
23822430
f"""
2383-
SELECT operation, summary
2384-
FROM {identifier}.snapshots
2385-
ORDER BY committed_at ASC
2386-
"""
2431+
SELECT operation, parent_id
2432+
FROM {identifier}.snapshots
2433+
ORDER BY committed_at ASC
2434+
"""
23872435
).collect()
23882436
operations = [row.operation for row in rows]
2389-
assert operations == ["append", "append"]
2437+
parent_snapshot_id = [row.parent_id for row in rows]
2438+
assert operations == ["append", "append", "append"]
2439+
# both subsequent parent id should be the first snapshot id
2440+
assert parent_snapshot_id == [None, current_snapshot, current_snapshot]
23902441

23912442

23922443
@pytest.mark.integration
@@ -2418,16 +2469,32 @@ def test_stage_only_overwrite_files(
24182469

24192470
assert current_snapshot == tbl.metadata.current_snapshot_id
24202471
assert len(tbl.scan().to_arrow()) == original_count
2421-
24222472
snapshots = tbl.snapshots()
24232473
assert len(snapshots) == 2
24242474

2475+
# Write to main branch
2476+
with tbl.transaction() as txn:
2477+
with txn.update_snapshot().fast_append() as fast_append:
2478+
for data_file in _dataframe_to_data_files(
2479+
table_metadata=txn.table_metadata, df=arrow_table_with_null, io=txn._table.io
2480+
):
2481+
fast_append.append_data_file(data_file=data_file)
2482+
2483+
# Main ref has changed
2484+
assert current_snapshot != tbl.metadata.current_snapshot_id
2485+
assert len(tbl.scan().to_arrow()) == 6
2486+
snapshots = tbl.snapshots()
2487+
assert len(snapshots) == 3
2488+
24252489
rows = spark.sql(
24262490
f"""
2427-
SELECT operation, summary
2428-
FROM {identifier}.snapshots
2429-
ORDER BY committed_at ASC
2430-
"""
2491+
SELECT operation, parent_id
2492+
FROM {identifier}.snapshots
2493+
ORDER BY committed_at ASC
2494+
"""
24312495
).collect()
24322496
operations = [row.operation for row in rows]
2433-
assert operations == ["append", "overwrite"]
2497+
parent_snapshot_id = [row.parent_id for row in rows]
2498+
assert operations == ["append", "overwrite", "append"]
2499+
# both subsequent parent id should be the first snapshot id
2500+
assert parent_snapshot_id == [None, current_snapshot, current_snapshot]

0 commit comments

Comments
 (0)