Skip to content

Commit ee4dd92

Browse files
authored
Add entries metadata table (#551)
* Add entries metadata table * lint * Revert typedef changes * Remove unrelated changes * Fix the CI * Add docs
1 parent d69407c commit ee4dd92

File tree

7 files changed

+582
-31
lines changed

7 files changed

+582
-31
lines changed

mkdocs/docs/api.md

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,165 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
370370
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
371371
```
372372
373-
### Add Files
373+
### Entries
374+
375+
To show all the table's current manifest entries for both data and delete files.
376+
377+
```python
378+
table.inspect.entries()
379+
```
380+
381+
```
382+
pyarrow.Table
383+
status: int8 not null
384+
snapshot_id: int64 not null
385+
sequence_number: int64 not null
386+
file_sequence_number: int64 not null
387+
data_file: struct<content: int8 not null, file_path: string not null, file_format: string not null, partition: struct<> not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map<int32, int64>, value_counts: map<int32, int64>, null_value_counts: map<int32, int64>, nan_value_counts: map<int32, int64>, lower_bounds: map<int32, binary>, upper_bounds: map<int32, binary>, key_metadata: binary, split_offsets: list<item: int64>, equality_ids: list<item: int32>, sort_order_id: int32> not null
388+
child 0, content: int8 not null
389+
child 1, file_path: string not null
390+
child 2, file_format: string not null
391+
child 3, partition: struct<> not null
392+
child 4, record_count: int64 not null
393+
child 5, file_size_in_bytes: int64 not null
394+
child 6, column_sizes: map<int32, int64>
395+
child 0, entries: struct<key: int32 not null, value: int64> not null
396+
child 0, key: int32 not null
397+
child 1, value: int64
398+
child 7, value_counts: map<int32, int64>
399+
child 0, entries: struct<key: int32 not null, value: int64> not null
400+
child 0, key: int32 not null
401+
child 1, value: int64
402+
child 8, null_value_counts: map<int32, int64>
403+
child 0, entries: struct<key: int32 not null, value: int64> not null
404+
child 0, key: int32 not null
405+
child 1, value: int64
406+
child 9, nan_value_counts: map<int32, int64>
407+
child 0, entries: struct<key: int32 not null, value: int64> not null
408+
child 0, key: int32 not null
409+
child 1, value: int64
410+
child 10, lower_bounds: map<int32, binary>
411+
child 0, entries: struct<key: int32 not null, value: binary> not null
412+
child 0, key: int32 not null
413+
child 1, value: binary
414+
child 11, upper_bounds: map<int32, binary>
415+
child 0, entries: struct<key: int32 not null, value: binary> not null
416+
child 0, key: int32 not null
417+
child 1, value: binary
418+
child 12, key_metadata: binary
419+
child 13, split_offsets: list<item: int64>
420+
child 0, item: int64
421+
child 14, equality_ids: list<item: int32>
422+
child 0, item: int32
423+
child 15, sort_order_id: int32
424+
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
425+
child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null
426+
child 0, column_size: int64
427+
child 1, value_count: int64
428+
child 2, null_value_count: int64
429+
child 3, nan_value_count: int64
430+
child 4, lower_bound: string
431+
child 5, upper_bound: string
432+
child 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
433+
child 0, column_size: int64
434+
child 1, value_count: int64
435+
child 2, null_value_count: int64
436+
child 3, nan_value_count: int64
437+
child 4, lower_bound: double
438+
child 5, upper_bound: double
439+
child 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
440+
child 0, column_size: int64
441+
child 1, value_count: int64
442+
child 2, null_value_count: int64
443+
child 3, nan_value_count: int64
444+
child 4, lower_bound: double
445+
child 5, upper_bound: double
446+
----
447+
status: [[1]]
448+
snapshot_id: [[6245626162224016531]]
449+
sequence_number: [[1]]
450+
file_sequence_number: [[1]]
451+
data_file: [
452+
-- is_valid: all not null
453+
-- child 0 type: int8
454+
[0]
455+
-- child 1 type: string
456+
["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"]
457+
-- child 2 type: string
458+
["PARQUET"]
459+
-- child 3 type: struct<>
460+
-- is_valid: all not null
461+
-- child 4 type: int64
462+
[4]
463+
-- child 5 type: int64
464+
[1656]
465+
-- child 6 type: map<int32, int64>
466+
[keys:[1,2,3]values:[140,135,135]]
467+
-- child 7 type: map<int32, int64>
468+
[keys:[1,2,3]values:[4,4,4]]
469+
-- child 8 type: map<int32, int64>
470+
[keys:[1,2,3]values:[0,0,0]]
471+
-- child 9 type: map<int32, int64>
472+
[keys:[]values:[]]
473+
-- child 10 type: map<int32, binary>
474+
[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]]
475+
-- child 11 type: map<int32, binary>
476+
[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]]
477+
-- child 12 type: binary
478+
[null]
479+
-- child 13 type: list<item: int64>
480+
[[4]]
481+
-- child 14 type: list<item: int32>
482+
[null]
483+
-- child 15 type: int32
484+
[null]]
485+
readable_metrics: [
486+
-- is_valid: all not null
487+
-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string>
488+
-- is_valid: all not null
489+
-- child 0 type: int64
490+
[140]
491+
-- child 1 type: int64
492+
[4]
493+
-- child 2 type: int64
494+
[0]
495+
-- child 3 type: int64
496+
[null]
497+
-- child 4 type: string
498+
["Amsterdam"]
499+
-- child 5 type: string
500+
["San Francisco"]
501+
-- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
502+
-- is_valid: all not null
503+
-- child 0 type: int64
504+
[135]
505+
-- child 1 type: int64
506+
[4]
507+
-- child 2 type: int64
508+
[0]
509+
-- child 3 type: int64
510+
[null]
511+
-- child 4 type: double
512+
[37.773972]
513+
-- child 5 type: double
514+
[53.11254]
515+
-- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
516+
-- is_valid: all not null
517+
-- child 0 type: int64
518+
[135]
519+
-- child 1 type: int64
520+
[4]
521+
-- child 2 type: int64
522+
[0]
523+
-- child 3 type: int64
524+
[null]
525+
-- child 4 type: double
526+
[-122.431297]
527+
-- child 5 type: double
528+
[6.0989]]
529+
```
530+
531+
## Add Files
374532

375533
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.
376534

pyiceberg/table/__init__.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from typing_extensions import Annotated
4848

4949
import pyiceberg.expressions.parser as parser
50+
from pyiceberg.conversions import from_bytes
5051
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
5152
from pyiceberg.expressions import (
5253
AlwaysTrue,
@@ -3264,3 +3265,126 @@ def snapshots(self) -> "pa.Table":
32643265
snapshots,
32653266
schema=snapshots_schema,
32663267
)
3268+
3269+
def entries(self) -> "pa.Table":
3270+
import pyarrow as pa
3271+
3272+
from pyiceberg.io.pyarrow import schema_to_pyarrow
3273+
3274+
schema = self.tbl.metadata.schema()
3275+
3276+
readable_metrics_struct = []
3277+
3278+
def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
3279+
pa_bound_type = schema_to_pyarrow(bound_type)
3280+
return pa.struct([
3281+
pa.field("column_size", pa.int64(), nullable=True),
3282+
pa.field("value_count", pa.int64(), nullable=True),
3283+
pa.field("null_value_count", pa.int64(), nullable=True),
3284+
pa.field("nan_value_count", pa.int64(), nullable=True),
3285+
pa.field("lower_bound", pa_bound_type, nullable=True),
3286+
pa.field("upper_bound", pa_bound_type, nullable=True),
3287+
])
3288+
3289+
for field in self.tbl.metadata.schema().fields:
3290+
readable_metrics_struct.append(
3291+
pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
3292+
)
3293+
3294+
partition_record = self.tbl.metadata.specs_struct()
3295+
pa_record_struct = schema_to_pyarrow(partition_record)
3296+
3297+
entries_schema = pa.schema([
3298+
pa.field('status', pa.int8(), nullable=False),
3299+
pa.field('snapshot_id', pa.int64(), nullable=False),
3300+
pa.field('sequence_number', pa.int64(), nullable=False),
3301+
pa.field('file_sequence_number', pa.int64(), nullable=False),
3302+
pa.field(
3303+
'data_file',
3304+
pa.struct([
3305+
pa.field('content', pa.int8(), nullable=False),
3306+
pa.field('file_path', pa.string(), nullable=False),
3307+
pa.field('file_format', pa.string(), nullable=False),
3308+
pa.field('partition', pa_record_struct, nullable=False),
3309+
pa.field('record_count', pa.int64(), nullable=False),
3310+
pa.field('file_size_in_bytes', pa.int64(), nullable=False),
3311+
pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True),
3312+
pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
3313+
pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
3314+
pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
3315+
pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
3316+
pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
3317+
pa.field('key_metadata', pa.binary(), nullable=True),
3318+
pa.field('split_offsets', pa.list_(pa.int64()), nullable=True),
3319+
pa.field('equality_ids', pa.list_(pa.int32()), nullable=True),
3320+
pa.field('sort_order_id', pa.int32(), nullable=True),
3321+
]),
3322+
nullable=False,
3323+
),
3324+
pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True),
3325+
])
3326+
3327+
entries = []
3328+
if snapshot := self.tbl.metadata.current_snapshot():
3329+
for manifest in snapshot.manifests(self.tbl.io):
3330+
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
3331+
column_sizes = entry.data_file.column_sizes or {}
3332+
value_counts = entry.data_file.value_counts or {}
3333+
null_value_counts = entry.data_file.null_value_counts or {}
3334+
nan_value_counts = entry.data_file.nan_value_counts or {}
3335+
lower_bounds = entry.data_file.lower_bounds or {}
3336+
upper_bounds = entry.data_file.upper_bounds or {}
3337+
readable_metrics = {
3338+
schema.find_column_name(field.field_id): {
3339+
"column_size": column_sizes.get(field.field_id),
3340+
"value_count": value_counts.get(field.field_id),
3341+
"null_value_count": null_value_counts.get(field.field_id),
3342+
"nan_value_count": nan_value_counts.get(field.field_id),
3343+
# Makes them readable
3344+
"lower_bound": from_bytes(field.field_type, lower_bound)
3345+
if (lower_bound := lower_bounds.get(field.field_id))
3346+
else None,
3347+
"upper_bound": from_bytes(field.field_type, upper_bound)
3348+
if (upper_bound := upper_bounds.get(field.field_id))
3349+
else None,
3350+
}
3351+
for field in self.tbl.metadata.schema().fields
3352+
}
3353+
3354+
partition = entry.data_file.partition
3355+
partition_record_dict = {
3356+
field.name: partition[pos]
3357+
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
3358+
}
3359+
3360+
entries.append({
3361+
'status': entry.status.value,
3362+
'snapshot_id': entry.snapshot_id,
3363+
'sequence_number': entry.data_sequence_number,
3364+
'file_sequence_number': entry.file_sequence_number,
3365+
'data_file': {
3366+
"content": entry.data_file.content,
3367+
"file_path": entry.data_file.file_path,
3368+
"file_format": entry.data_file.file_format,
3369+
"partition": partition_record_dict,
3370+
"record_count": entry.data_file.record_count,
3371+
"file_size_in_bytes": entry.data_file.file_size_in_bytes,
3372+
"column_sizes": dict(entry.data_file.column_sizes),
3373+
"value_counts": dict(entry.data_file.value_counts),
3374+
"null_value_counts": dict(entry.data_file.null_value_counts),
3375+
"nan_value_counts": entry.data_file.nan_value_counts,
3376+
"lower_bounds": entry.data_file.lower_bounds,
3377+
"upper_bounds": entry.data_file.upper_bounds,
3378+
"key_metadata": entry.data_file.key_metadata,
3379+
"split_offsets": entry.data_file.split_offsets,
3380+
"equality_ids": entry.data_file.equality_ids,
3381+
"sort_order_id": entry.data_file.sort_order_id,
3382+
"spec_id": entry.data_file.spec_id,
3383+
},
3384+
'readable_metrics': readable_metrics,
3385+
})
3386+
3387+
return pa.Table.from_pylist(
3388+
entries,
3389+
schema=entries_schema,
3390+
)

pyiceberg/table/metadata.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
IcebergRootModel,
5050
Properties,
5151
)
52-
from pyiceberg.types import transform_dict_value_to_str
52+
from pyiceberg.types import NestedField, StructType, transform_dict_value_to_str
5353
from pyiceberg.utils.config import Config
5454
from pyiceberg.utils.datetime import datetime_to_millis
5555

@@ -245,6 +245,31 @@ def specs(self) -> Dict[int, PartitionSpec]:
245245
"""Return a dict the partition specs this table."""
246246
return {spec.spec_id: spec for spec in self.partition_specs}
247247

248+
def specs_struct(self) -> StructType:
249+
"""Produce a struct of all the combined PartitionSpecs.
250+
251+
The partition fields should be optional: Partition fields may be added later,
252+
in which case not all files would have the result field, and it may be null.
253+
254+
:return: A StructType that represents all the combined PartitionSpecs of the table
255+
"""
256+
specs = self.specs()
257+
258+
# Collect all the fields
259+
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}
260+
261+
schema = self.schema()
262+
263+
nested_fields = []
264+
# Sort them by field_id in order to get a deterministic output
265+
for field_id in sorted(struct_fields):
266+
field = struct_fields[field_id]
267+
source_type = schema.find_type(field.source_id)
268+
result_type = field.transform.result_type(source_type)
269+
nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False))
270+
271+
return StructType(*nested_fields)
272+
248273
def new_snapshot_id(self) -> int:
249274
"""Generate a new snapshot-id that's not in use."""
250275
snapshot_id = _generate_snapshot_id()

pyiceberg/utils/lazydict.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,7 @@ def __len__(self) -> int:
6666
"""Return the number of items in the dictionary."""
6767
source = self._dict or self._build_dict()
6868
return len(source)
69+
70+
def __dict__(self) -> Dict[K, V]: # type: ignore
71+
"""Convert the lazy dict in a dict."""
72+
return self._dict or self._build_dict()

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2043,5 +2043,5 @@ def pa_schema() -> "pa.Schema":
20432043
def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
20442044
import pyarrow as pa
20452045

2046-
"""PyArrow table with all kinds of columns"""
2046+
"""Pyarrow table with all kinds of columns."""
20472047
return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)

0 commit comments

Comments
 (0)