Skip to content

Commit 9cb3cd5

Browse files
authored
Metadata Log Entries metadata table (#667)
1 parent a6cd0cf commit 9cb3cd5

File tree

6 files changed

+108
-1
lines changed

6 files changed

+108
-1
lines changed

mkdocs/docs/api.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,29 @@ partition_summaries: [[ -- is_valid: all not null
656656
["test"]]]
657657
```
658658

659+
### Metadata Log Entries
660+
661+
To show table metadata log entries:
662+
663+
```python
664+
table.inspect.metadata_log_entries()
665+
```
666+
667+
```
668+
pyarrow.Table
669+
timestamp: timestamp[ms] not null
670+
file: string not null
671+
latest_snapshot_id: int64
672+
latest_schema_id: int32
673+
latest_sequence_number: int64
674+
----
675+
timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 17:03:00.445,2024-04-28 17:03:00.498]]
676+
file: [["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]]
677+
latest_snapshot_id: [[null,3958871664825505738,1289234307021405706,7640277914614648349]]
678+
latest_schema_id: [[null,0,0,0]]
679+
latest_sequence_number: [[null,0,0,0]]
680+
```
681+
659682
## Add Files
660683

661684
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

pyiceberg/table/__init__.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3845,6 +3845,40 @@ def _partition_summaries_to_rows(
38453845
schema=manifest_schema,
38463846
)
38473847

3848+
def metadata_log_entries(self) -> "pa.Table":
3849+
import pyarrow as pa
3850+
3851+
from pyiceberg.table.snapshots import MetadataLogEntry
3852+
3853+
table_schema = pa.schema([
3854+
pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False),
3855+
pa.field("file", pa.string(), nullable=False),
3856+
pa.field("latest_snapshot_id", pa.int64(), nullable=True),
3857+
pa.field("latest_schema_id", pa.int32(), nullable=True),
3858+
pa.field("latest_sequence_number", pa.int64(), nullable=True),
3859+
])
3860+
3861+
def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any]:
3862+
latest_snapshot = self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms)
3863+
return {
3864+
"timestamp": metadata_entry.timestamp_ms,
3865+
"file": metadata_entry.metadata_file,
3866+
"latest_snapshot_id": latest_snapshot.snapshot_id if latest_snapshot else None,
3867+
"latest_schema_id": latest_snapshot.schema_id if latest_snapshot else None,
3868+
"latest_sequence_number": latest_snapshot.sequence_number if latest_snapshot else None,
3869+
}
3870+
3871+
# similar to MetadataLogEntriesTable in Java
3872+
# https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66
3873+
metadata_log_entries = self.tbl.metadata.metadata_log + [
3874+
MetadataLogEntry(metadata_file=self.tbl.metadata_location, timestamp_ms=self.tbl.metadata.last_updated_ms)
3875+
]
3876+
3877+
return pa.Table.from_pylist(
3878+
[metadata_log_entry_to_row(entry) for entry in metadata_log_entries],
3879+
schema=table_schema,
3880+
)
3881+
38483882

38493883
@dataclass(frozen=True)
38503884
class TablePartition:

pyiceberg/table/metadata.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,13 @@ def serialize_current_snapshot_id(self, current_snapshot_id: Optional[int]) -> O
311311
return -1
312312
return current_snapshot_id
313313

314+
@field_serializer("snapshots")
315+
def serialize_snapshots(self, snapshots: List[Snapshot]) -> List[Snapshot]:
316+
# Snapshot field `sequence-number` should not be written for v1 metadata
317+
if self.format_version == 1:
318+
return [snapshot.model_copy(update={"sequence_number": None}) for snapshot in snapshots]
319+
return snapshots
320+
314321

315322
def _generate_snapshot_id() -> int:
316323
"""Generate a new Snapshot ID from a UUID.

pyiceberg/table/snapshots.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
CHANGED_PARTITION_PREFIX = "partitions."
5959
OPERATION = "operation"
6060

61+
INITIAL_SEQUENCE_NUMBER = 0
62+
6163

6264
class Operation(Enum):
6365
"""Describes the operation.
@@ -231,7 +233,7 @@ def __eq__(self, other: Any) -> bool:
231233
class Snapshot(IcebergBaseModel):
232234
snapshot_id: int = Field(alias="snapshot-id")
233235
parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None)
234-
sequence_number: Optional[int] = Field(alias="sequence-number", default=None)
236+
sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER)
235237
timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000))
236238
manifest_list: Optional[str] = Field(
237239
alias="manifest-list", description="Location of the snapshot's manifest list file", default=None

tests/integration/test_inspect_table.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,3 +528,43 @@ def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog, format
528528
for column in df.column_names:
529529
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
530530
assert left == right, f"Difference in column {column}: {left} != {right}"
531+
532+
533+
@pytest.mark.integration
534+
@pytest.mark.parametrize("format_version", [1, 2])
535+
def test_inspect_metadata_log_entries(
536+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
537+
) -> None:
538+
from pandas.testing import assert_frame_equal
539+
540+
identifier = "default.table_metadata_log_entries"
541+
try:
542+
session_catalog.drop_table(identifier=identifier)
543+
except NoSuchTableError:
544+
pass
545+
546+
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
547+
548+
# Write some data
549+
tbl.append(arrow_table_with_null)
550+
tbl.append(arrow_table_with_null)
551+
tbl.append(arrow_table_with_null)
552+
553+
df = tbl.inspect.metadata_log_entries()
554+
spark_df = spark.sql(f"SELECT * FROM {identifier}.metadata_log_entries")
555+
lhs = df.to_pandas()
556+
rhs = spark_df.toPandas()
557+
558+
# Timestamp in the last row of `metadata_log_entries` table is based on when the table was read
559+
# Therefore, the timestamp of the last row for pyiceberg dataframe and spark dataframe will be different
560+
left_before_last, left_last = lhs[:-1], lhs[-1:]
561+
right_before_last, right_last = rhs[:-1], rhs[-1:]
562+
563+
# compare all rows except for the last row
564+
assert_frame_equal(left_before_last, right_before_last, check_dtype=False)
565+
# compare the last row, except for the timestamp
566+
for column in df.column_names:
567+
for left, right in zip(left_last[column], right_last[column]):
568+
if column == "timestamp":
569+
continue
570+
assert left == right, f"Difference in column {column}: {left} != {right}"

tests/table/test_snapshots.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def test_serialize_snapshot_without_sequence_number() -> None:
7777
snapshot = Snapshot(
7878
snapshot_id=25,
7979
parent_snapshot_id=19,
80+
sequence_number=None,
8081
timestamp_ms=1602638573590,
8182
manifest_list="s3:/a/b/c.avro",
8283
summary=Summary(Operation.APPEND),

0 commit comments

Comments
 (0)