Skip to content

Commit 74497fb

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-python into fd-add-ability-to-delete-full-data-files
2 parents a97c45a + 87656fb commit 74497fb

File tree

11 files changed

+661
-284
lines changed

11 files changed

+661
-284
lines changed

mkdocs/docs/api.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,18 @@ table.append(df)
342342

343343
To explore the table metadata, tables can be inspected.
344344

345+
<!-- prettier-ignore-start -->
346+
347+
!!! tip "Time Travel"
348+
To inspect a tables's metadata with the time travel feature, call the inspect table method with the `snapshot_id` argument.
349+
Time travel is supported on all metadata tables except `snapshots` and `refs`.
350+
351+
```python
352+
table.inspect.entries(snapshot_id=805611270568163028)
353+
```
354+
355+
<!-- prettier-ignore-end -->
356+
345357
### Snapshots
346358

347359
Inspect the snapshots of the table:
@@ -370,6 +382,47 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
370382
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
371383
```
372384
385+
### Partitions
386+
387+
Inspect the partitions of the table:
388+
389+
```python
390+
table.inspect.partitions()
391+
```
392+
393+
```
394+
pyarrow.Table
395+
partition: struct<dt_month: int32, dt_day: date32[day]> not null
396+
child 0, dt_month: int32
397+
child 1, dt_day: date32[day]
398+
spec_id: int32 not null
399+
record_count: int64 not null
400+
file_count: int32 not null
401+
total_data_file_size_in_bytes: int64 not null
402+
position_delete_record_count: int64 not null
403+
position_delete_file_count: int32 not null
404+
equality_delete_record_count: int64 not null
405+
equality_delete_file_count: int32 not null
406+
last_updated_at: timestamp[ms]
407+
last_updated_snapshot_id: int64
408+
----
409+
partition: [
410+
-- is_valid: all not null
411+
-- child 0 type: int32
412+
[null,null,612]
413+
-- child 1 type: date32[day]
414+
[null,2021-02-01,null]]
415+
spec_id: [[2,1,0]]
416+
record_count: [[1,1,2]]
417+
file_count: [[1,1,2]]
418+
total_data_file_size_in_bytes: [[641,641,1260]]
419+
position_delete_record_count: [[0,0,0]]
420+
position_delete_file_count: [[0,0,0]]
421+
equality_delete_record_count: [[0,0,0]]
422+
equality_delete_file_count: [[0,0,0]]
423+
last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 18:59:35.003]]
424+
```
425+
373426
### Entries
374427

375428
To show all the table's current manifest entries for both data and delete files.
@@ -528,6 +581,31 @@ readable_metrics: [
528581
[6.0989]]
529582
```
530583

584+
### References
585+
586+
To show a table's known snapshot references:
587+
588+
```python
589+
table.inspect.refs()
590+
```
591+
592+
```
593+
pyarrow.Table
594+
name: string not null
595+
type: string not null
596+
snapshot_id: int64 not null
597+
max_reference_age_in_ms: int64
598+
min_snapshots_to_keep: int32
599+
max_snapshot_age_in_ms: int64
600+
----
601+
name: [["main","testTag"]]
602+
type: [["BRANCH","TAG"]]
603+
snapshot_id: [[2278002651076891950,2278002651076891950]]
604+
max_reference_age_in_ms: [[null,604800000]]
605+
min_snapshots_to_keep: [[null,10]]
606+
max_snapshot_age_in_ms: [[null,604800000]]
607+
```
608+
531609
## Add Files
532610

533611
Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

pyiceberg/catalog/hive.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -372,22 +372,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
372372
identifier_tuple = self.identifier_to_tuple_without_catalog(
373373
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
374374
)
375-
current_table = self.load_table(identifier_tuple)
376375
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
377-
base_metadata = current_table.metadata
378-
for requirement in table_request.requirements:
379-
requirement.validate(base_metadata)
380-
381-
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
382-
if updated_metadata == base_metadata:
383-
# no changes, do nothing
384-
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
385-
386-
# write new metadata
387-
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
388-
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
389-
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
390-
391376
# commit to hive
392377
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
393378
with self._client as open_client:
@@ -397,11 +382,28 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
397382
if lock.state != LockState.ACQUIRED:
398383
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
399384

400-
tbl = open_client.get_table(dbname=database_name, tbl_name=table_name)
401-
tbl.parameters = _construct_parameters(
385+
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
386+
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
387+
current_table = self._convert_hive_into_iceberg(hive_table, io)
388+
389+
base_metadata = current_table.metadata
390+
for requirement in table_request.requirements:
391+
requirement.validate(base_metadata)
392+
393+
updated_metadata = update_table_metadata(base_metadata, table_request.updates)
394+
if updated_metadata == base_metadata:
395+
# no changes, do nothing
396+
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
397+
398+
# write new metadata
399+
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
400+
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
401+
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)
402+
403+
hive_table.parameters = _construct_parameters(
402404
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
403405
)
404-
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl)
406+
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
405407
except NoSuchObjectException as e:
406408
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
407409
finally:

pyiceberg/io/pyarrow.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -966,20 +966,15 @@ def _task_to_table(
966966
with fs.open_input_file(path) as fin:
967967
fragment = arrow_format.make_fragment(fin)
968968
physical_schema = fragment.physical_schema
969-
schema_raw = None
970-
if metadata := physical_schema.metadata:
971-
schema_raw = metadata.get(ICEBERG_SCHEMA)
972-
file_schema = (
973-
Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema, name_mapping)
974-
)
969+
file_schema = pyarrow_to_schema(physical_schema, name_mapping)
975970

976971
pyarrow_filter = None
977972
if bound_row_filter is not AlwaysTrue():
978973
translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive)
979974
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
980975
pyarrow_filter = expression_to_pyarrow(bound_file_filter)
981976

982-
file_project_schema = sanitize_column_names(prune_columns(file_schema, projected_field_ids, select_full_types=False))
977+
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
983978

984979
if file_schema is None:
985980
raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
@@ -1022,7 +1017,6 @@ def _task_to_table(
10221017

10231018
if len(arrow_table) < 1:
10241019
return None
1025-
10261020
return to_requested_schema(projected_schema, file_project_schema, arrow_table)
10271021

10281022

@@ -1783,25 +1777,34 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterable["Write
17831777

17841778
schema = table_metadata.schema()
17851779
arrow_file_schema = schema.as_arrow()
1786-
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
17871780

1781+
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
17881782
row_group_size = PropertyUtil.property_as_int(
17891783
properties=table_metadata.properties,
17901784
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
17911785
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
17921786
)
17931787

17941788
def write_parquet(task: WriteTask) -> DataFile:
1789+
table_schema = task.schema
1790+
arrow_table = pa.Table.from_batches(task.record_batches)
1791+
# if schema needs to be transformed, use the transformed schema and adjust the arrow table accordingly
1792+
# otherwise use the original schema
1793+
if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema:
1794+
file_schema = sanitized_schema
1795+
arrow_table = to_requested_schema(requested_schema=file_schema, file_schema=table_schema, table=arrow_table)
1796+
else:
1797+
file_schema = table_schema
1798+
17951799
file_path = f'{table_metadata.location}/data/{task.generate_data_file_path("parquet")}'
17961800
fo = io.new_output(file_path)
17971801
with fo.create(overwrite=True) as fos:
1798-
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
1799-
writer.write(pa.Table.from_batches(task.record_batches), row_group_size=row_group_size)
1800-
1802+
with pq.ParquetWriter(fos, schema=file_schema.as_arrow(), **parquet_writer_kwargs) as writer:
1803+
writer.write(arrow_table, row_group_size=row_group_size)
18011804
statistics = data_file_statistics_from_parquet_metadata(
18021805
parquet_metadata=writer.writer.metadata,
1803-
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1804-
parquet_column_mapping=parquet_path_to_id_mapping(schema),
1806+
stats_columns=compute_statistics_plan(file_schema, table_metadata.properties),
1807+
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
18051808
)
18061809
data_file = DataFile(
18071810
content=DataFileContent.DATA,

0 commit comments

Comments
 (0)