Skip to content

Commit 94ce205

Browse files
rambleraptorFokko
andauthored
Row lineage fields for v3 (#2129)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> Closes #1821 # Rationale for this change This adds the proper row-lineage fields for v3. Row-lineage is enforced for v3, so all of this is done by default. # Are these changes tested? Tests are included. I'd like to include an integration test, but we don't currently allow writing v3 manifest files. # Are there any user-facing changes? Adds row-lineage fields. <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Fokko Driesprong <[email protected]>
1 parent 924ee73 commit 94ce205

File tree

6 files changed

+164
-2
lines changed

6 files changed

+164
-2
lines changed

pyiceberg/table/snapshots.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ class Snapshot(IcebergBaseModel):
244244
manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file")
245245
summary: Optional[Summary] = Field(default=None)
246246
schema_id: Optional[int] = Field(alias="schema-id", default=None)
247+
first_row_id: Optional[int] = Field(
248+
alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest"
249+
)
250+
added_rows: Optional[int] = Field(
251+
alias="added-rows", default=None, description="The upper bound of the number of rows with assigned row IDs"
252+
)
247253

248254
def __str__(self) -> str:
249255
"""Return the string representation of the Snapshot class."""
@@ -253,6 +259,22 @@ def __str__(self) -> str:
253259
result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}"
254260
return result_str
255261

262+
def __repr__(self) -> str:
263+
"""Return the string representation of the Snapshot class."""
264+
fields = [
265+
f"snapshot_id={self.snapshot_id}",
266+
f"parent_snapshot_id={self.parent_snapshot_id}",
267+
f"sequence_number={self.sequence_number}",
268+
f"timestamp_ms={self.timestamp_ms}",
269+
f"manifest_list='{self.manifest_list}'",
270+
f"summary={repr(self.summary)}" if self.summary else None,
271+
f"schema_id={self.schema_id}" if self.schema_id is not None else None,
272+
f"first_row_id={self.first_row_id}" if self.first_row_id is not None else None,
273+
f"added_rows={self.added_rows}" if self.added_rows is not None else None,
274+
]
275+
filtered_fields = [field for field in fields if field is not None]
276+
return f"Snapshot({', '.join(filtered_fields)})"
277+
256278
def manifests(self, io: FileIO) -> List[ManifestFile]:
257279
"""Return the manifests for the given snapshot."""
258280
return list(_manifests(io, self.manifest_list))

pyiceberg/table/update/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,13 +437,29 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
437437
f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} "
438438
f"older than last sequence number {base_metadata.last_sequence_number}"
439439
)
440+
elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None:
441+
raise ValueError("Cannot add snapshot without first row id")
442+
elif (
443+
base_metadata.format_version >= 3
444+
and update.snapshot.first_row_id is not None
445+
and base_metadata.next_row_id is not None
446+
and update.snapshot.first_row_id < base_metadata.next_row_id
447+
):
448+
raise ValueError(
449+
f"Cannot add a snapshot with first row id smaller than the table's next-row-id {update.snapshot.first_row_id} < {base_metadata.next_row_id}"
450+
)
440451

441452
context.add_update(update)
442453
return base_metadata.model_copy(
443454
update={
444455
"last_updated_ms": update.snapshot.timestamp_ms,
445456
"last_sequence_number": update.snapshot.sequence_number,
446457
"snapshots": base_metadata.snapshots + [update.snapshot],
458+
"next_row_id": base_metadata.next_row_id + update.snapshot.added_rows
459+
if base_metadata.format_version >= 3
460+
and base_metadata.next_row_id is not None
461+
and update.snapshot.added_rows is not None
462+
else None,
447463
}
448464
)
449465

pyiceberg/table/update/snapshot.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,19 @@ def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
157157
self._deleted_data_files.add(data_file)
158158
return self
159159

160+
def _calculate_added_rows(self, manifests: List[ManifestFile]) -> int:
161+
"""Calculate the number of added rows from a list of manifest files."""
162+
added_rows = 0
163+
for manifest in manifests:
164+
if manifest.added_snapshot_id is None or manifest.added_snapshot_id == self._snapshot_id:
165+
if manifest.added_rows_count is None:
166+
raise ValueError(
167+
"Cannot determine number of added rows in snapshot because "
168+
f"the entry for manifest {manifest.manifest_path} is missing the field `added-rows-count`"
169+
)
170+
added_rows += manifest.added_rows_count
171+
return added_rows
172+
160173
@abstractmethod
161174
def _deleted_entries(self) -> List[ManifestEntry]: ...
162175

@@ -284,13 +297,19 @@ def _commit(self) -> UpdatesAndRequirements:
284297
) as writer:
285298
writer.add_manifests(new_manifests)
286299

300+
first_row_id: Optional[int] = None
301+
302+
if self._transaction.table_metadata.format_version >= 3:
303+
first_row_id = self._transaction.table_metadata.next_row_id
304+
287305
snapshot = Snapshot(
288306
snapshot_id=self._snapshot_id,
289307
parent_snapshot_id=self._parent_snapshot_id,
290308
manifest_list=manifest_list_file_path,
291309
sequence_number=next_sequence_number,
292310
summary=summary,
293311
schema_id=self._transaction.table_metadata.current_schema_id,
312+
first_row_id=first_row_id,
294313
)
295314

296315
add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot)

tests/conftest.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
from pyiceberg.schema import Accessor, Schema
7373
from pyiceberg.serializers import ToOutputFile
7474
from pyiceberg.table import FileScanTask, Table
75-
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
75+
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3
7676
from pyiceberg.transforms import DayTransform, IdentityTransform
7777
from pyiceberg.types import (
7878
BinaryType,
@@ -920,6 +920,7 @@ def generate_snapshot(
920920
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
921921
"location": "s3://bucket/test/location",
922922
"last-sequence-number": 34,
923+
"next-row-id": 1,
923924
"last-updated-ms": 1602638573590,
924925
"last-column-id": 3,
925926
"current-schema-id": 1,
@@ -2489,6 +2490,18 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table:
24892490
)
24902491

24912492

2493+
@pytest.fixture
2494+
def table_v3(example_table_metadata_v3: Dict[str, Any]) -> Table:
2495+
table_metadata = TableMetadataV3(**example_table_metadata_v3)
2496+
return Table(
2497+
identifier=("database", "table"),
2498+
metadata=table_metadata,
2499+
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
2500+
io=load_file_io(),
2501+
catalog=NoopCatalog("NoopCatalog"),
2502+
)
2503+
2504+
24922505
@pytest.fixture
24932506
def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table:
24942507
import copy

tests/integration/test_writes/test_writes.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
StringType,
6565
UUIDType,
6666
)
67-
from utils import _create_table
67+
from utils import TABLE_SCHEMA, _create_table
6868

6969

7070
@pytest.fixture(scope="session", autouse=True)
@@ -2490,3 +2490,41 @@ def test_stage_only_overwrite_files(
24902490
assert operations == ["append", "append", "delete", "append", "append"]
24912491

24922492
assert parent_snapshot_id == [None, first_snapshot, second_snapshot, second_snapshot, second_snapshot]
2493+
2494+
2495+
@pytest.mark.skip("V3 writer support is not enabled.")
2496+
@pytest.mark.integration
2497+
def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Catalog) -> None:
2498+
"""Test writing to a v3 table and reading with Spark."""
2499+
identifier = "default.test_v3_write_and_read"
2500+
tbl = _create_table(session_catalog, identifier, {"format-version": "3"})
2501+
assert tbl.format_version == 3, f"Expected v3, got: v{tbl.format_version}"
2502+
initial_next_row_id = tbl.metadata.next_row_id or 0
2503+
2504+
test_data = pa.Table.from_pydict(
2505+
{
2506+
"bool": [True, False, True],
2507+
"string": ["a", "b", "c"],
2508+
"string_long": ["a_long", "b_long", "c_long"],
2509+
"int": [1, 2, 3],
2510+
"long": [11, 22, 33],
2511+
"float": [1.1, 2.2, 3.3],
2512+
"double": [1.11, 2.22, 3.33],
2513+
"timestamp": [datetime(2023, 1, 1, 1, 1, 1), datetime(2023, 2, 2, 2, 2, 2), datetime(2023, 3, 3, 3, 3, 3)],
2514+
"timestamptz": [
2515+
datetime(2023, 1, 1, 1, 1, 1, tzinfo=pytz.utc),
2516+
datetime(2023, 2, 2, 2, 2, 2, tzinfo=pytz.utc),
2517+
datetime(2023, 3, 3, 3, 3, 3, tzinfo=pytz.utc),
2518+
],
2519+
"date": [date(2023, 1, 1), date(2023, 2, 2), date(2023, 3, 3)],
2520+
"binary": [b"\x01", b"\x02", b"\x03"],
2521+
"fixed": [b"1234567890123456", b"1234567890123456", b"1234567890123456"],
2522+
},
2523+
schema=TABLE_SCHEMA.as_arrow(),
2524+
)
2525+
2526+
tbl.append(test_data)
2527+
2528+
assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), (
2529+
"Expected next_row_id to be incremented by the number of added rows"
2530+
)

tests/table/test_init.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1521,3 +1521,57 @@ def test_remove_partition_statistics_update_with_invalid_snapshot_id(table_v2_wi
15211521
table_v2_with_statistics.metadata,
15221522
(RemovePartitionStatisticsUpdate(snapshot_id=123456789),),
15231523
)
1524+
1525+
1526+
def test_add_snapshot_update_fails_without_first_row_id(table_v3: Table) -> None:
1527+
new_snapshot = Snapshot(
1528+
snapshot_id=25,
1529+
parent_snapshot_id=19,
1530+
sequence_number=200,
1531+
timestamp_ms=1602638593590,
1532+
manifest_list="s3:/a/b/c.avro",
1533+
summary=Summary(Operation.APPEND),
1534+
schema_id=3,
1535+
)
1536+
1537+
with pytest.raises(
1538+
ValueError,
1539+
match="Cannot add snapshot without first row id",
1540+
):
1541+
update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))
1542+
1543+
1544+
def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> None:
1545+
new_snapshot = Snapshot(
1546+
snapshot_id=25,
1547+
parent_snapshot_id=19,
1548+
sequence_number=200,
1549+
timestamp_ms=1602638593590,
1550+
manifest_list="s3:/a/b/c.avro",
1551+
summary=Summary(Operation.APPEND),
1552+
schema_id=3,
1553+
first_row_id=0,
1554+
)
1555+
1556+
with pytest.raises(
1557+
ValueError,
1558+
match="Cannot add a snapshot with first row id smaller than the table's next-row-id",
1559+
):
1560+
update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))
1561+
1562+
1563+
def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None:
1564+
new_snapshot = Snapshot(
1565+
snapshot_id=25,
1566+
parent_snapshot_id=19,
1567+
sequence_number=200,
1568+
timestamp_ms=1602638593590,
1569+
manifest_list="s3:/a/b/c.avro",
1570+
summary=Summary(Operation.APPEND),
1571+
schema_id=3,
1572+
first_row_id=2,
1573+
added_rows=10,
1574+
)
1575+
1576+
new_metadata = update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),))
1577+
assert new_metadata.next_row_id == 11

0 commit comments

Comments
 (0)