Skip to content

Commit 2ee2d19

Browse files
geruhFokko
andauthored
Add Refs metadata table (#602)
* Add Refs metadata table * address the linting issues * add more tests to the output * Change string type to categorical * fix linting after rebasing. * Update tests/integration/test_inspect_table.py --------- Co-authored-by: Fokko Driesprong <[email protected]>
1 parent c69b8d2 commit 2ee2d19

File tree

3 files changed

+111
-0
lines changed

3 files changed

+111
-0
lines changed

mkdocs/docs/api.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,31 @@ readable_metrics: [
581581
[6.0989]]
582582
```
583583

584+
### References
585+
586+
To show a table's known snapshot references:
587+
588+
```python
589+
table.inspect.refs()
590+
```
591+
592+
```
593+
pyarrow.Table
594+
name: string not null
595+
type: string not null
596+
snapshot_id: int64 not null
597+
max_reference_age_in_ms: int64
598+
min_snapshots_to_keep: int32
599+
max_snapshot_age_in_ms: int64
600+
----
601+
name: [["main","testTag"]]
602+
type: [["BRANCH","TAG"]]
603+
snapshot_id: [[2278002651076891950,2278002651076891950]]
604+
max_reference_age_in_ms: [[null,604800000]]
605+
min_snapshots_to_keep: [[null,10]]
606+
max_snapshot_age_in_ms: [[null,604800000]]
607+
```
608+
584609
## Add Files
585610

586611
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

pyiceberg/table/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3423,6 +3423,32 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
34233423
schema=entries_schema,
34243424
)
34253425

3426+
def refs(self) -> "pa.Table":
3427+
import pyarrow as pa
3428+
3429+
ref_schema = pa.schema([
3430+
pa.field('name', pa.string(), nullable=False),
3431+
pa.field('type', pa.dictionary(pa.int32(), pa.string()), nullable=False),
3432+
pa.field('snapshot_id', pa.int64(), nullable=False),
3433+
pa.field('max_reference_age_in_ms', pa.int64(), nullable=True),
3434+
pa.field('min_snapshots_to_keep', pa.int32(), nullable=True),
3435+
pa.field('max_snapshot_age_in_ms', pa.int64(), nullable=True),
3436+
])
3437+
3438+
ref_results = []
3439+
for ref in self.tbl.metadata.refs:
3440+
if snapshot_ref := self.tbl.metadata.refs.get(ref):
3441+
ref_results.append({
3442+
'name': ref,
3443+
'type': snapshot_ref.snapshot_ref_type.upper(),
3444+
'snapshot_id': snapshot_ref.snapshot_id,
3445+
'max_reference_age_in_ms': snapshot_ref.max_ref_age_ms,
3446+
'min_snapshots_to_keep': snapshot_ref.min_snapshots_to_keep,
3447+
'max_snapshot_age_in_ms': snapshot_ref.max_snapshot_age_ms,
3448+
})
3449+
3450+
return pa.Table.from_pylist(ref_results, schema=ref_schema)
3451+
34263452
def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
34273453
import pyarrow as pa
34283454

tests/integration/test_inspect_table.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,66 @@ def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: Catal
272272
assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 'dt_month': 612}
273273

274274

275+
@pytest.mark.integration
276+
@pytest.mark.parametrize("format_version", [1, 2])
277+
def test_inspect_refs(
278+
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
279+
) -> None:
280+
identifier = "default.table_metadata_refs"
281+
tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version})
282+
283+
# write data to create snapshot
284+
tbl.overwrite(arrow_table_with_null)
285+
286+
# create a test branch
287+
spark.sql(
288+
f"""
289+
ALTER TABLE {identifier} CREATE BRANCH IF NOT EXISTS testBranch RETAIN 7 DAYS WITH SNAPSHOT RETENTION 2 SNAPSHOTS
290+
"""
291+
)
292+
293+
# create a test tag against current snapshot
294+
current_snapshot = tbl.current_snapshot()
295+
assert current_snapshot is not None
296+
current_snapshot_id = current_snapshot.snapshot_id
297+
298+
spark.sql(
299+
f"""
300+
ALTER TABLE {identifier} CREATE TAG testTag AS OF VERSION {current_snapshot_id} RETAIN 180 DAYS
301+
"""
302+
)
303+
304+
df = tbl.refresh().inspect.refs()
305+
306+
assert df.column_names == [
307+
'name',
308+
'type',
309+
'snapshot_id',
310+
'max_reference_age_in_ms',
311+
'min_snapshots_to_keep',
312+
'max_snapshot_age_in_ms',
313+
]
314+
315+
assert [name.as_py() for name in df['name']] == ['testBranch', 'main', 'testTag']
316+
assert [ref_type.as_py() for ref_type in df['type']] == ['BRANCH', 'BRANCH', 'TAG']
317+
318+
for snapshot_id in df['snapshot_id']:
319+
assert isinstance(snapshot_id.as_py(), int)
320+
321+
for int_column in ['max_reference_age_in_ms', 'min_snapshots_to_keep', 'max_snapshot_age_in_ms']:
322+
for value in df[int_column]:
323+
assert isinstance(value.as_py(), int) or not value.as_py()
324+
325+
lhs = spark.table(f"{identifier}.refs").toPandas()
326+
rhs = df.to_pandas()
327+
for column in df.column_names:
328+
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
329+
if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right):
330+
# NaN != NaN in Python
331+
continue
332+
assert left == right, f"Difference in column {column}: {left} != {right}"
333+
334+
275335
@pytest.mark.integration
276336
@pytest.mark.parametrize("format_version", [1, 2])
277337
def test_inspect_partitions_unpartitioned(

0 commit comments

Comments
 (0)