Skip to content

Commit 6036e12

Browse files
committed
wip: enhance deduplication logic and improve data file handling in maintenance operations
1 parent 536528e commit 6036e12

File tree

3 files changed

+172
-160
lines changed

3 files changed

+172
-160
lines changed

pyiceberg/table/inspect.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
from __future__ import annotations
1818

1919
from datetime import datetime, timezone
20-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union
20+
from functools import reduce
21+
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union
2122

2223
from pyiceberg.conversions import from_bytes
2324
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
2425
from pyiceberg.partitioning import PartitionSpec
2526
from pyiceberg.table.snapshots import Snapshot, ancestors_of
2627
from pyiceberg.types import PrimitiveType
28+
from pyiceberg.utils.concurrent import ExecutorFactory
2729
from pyiceberg.utils.singleton import _convert_to_hashable_type
2830

2931
if TYPE_CHECKING:
@@ -649,14 +651,11 @@ def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[S
649651

650652
snapshot = self._get_snapshot(snapshot_id)
651653
io = self.tbl.io
654+
files_table: list[pa.Table] = []
655+
for manifest_list in snapshot.manifests(io):
656+
files_table.append(self._get_files_from_manifest(manifest_list, data_file_filter))
652657

653-
executor = ExecutorFactory.get_or_create()
654-
results = list(
655-
executor.map(
656-
lambda manifest_list: self._get_files_from_manifest(manifest_list, data_file_filter), snapshot.manifests(io)
657-
)
658-
)
659-
return pa.concat_tables(results)
658+
return pa.concat_tables(files_table)
660659

661660
def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
662661
return self._files(snapshot_id)
@@ -683,13 +682,39 @@ def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] =
683682

684683
if not snapshots:
685684
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())
686-
685+
687686
executor = ExecutorFactory.get_or_create()
688687
manifests_by_snapshots: Iterator["pa.Table"] = executor.map(
689688
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
690689
)
691690
return pa.concat_tables(manifests_by_snapshots)
692691

692+
def _all_known_files(self) -> dict[str, set[str]]:
693+
"""Get all the known files in the table.
694+
695+
Returns:
696+
dict of {file_type: set of file paths} for each file type.
697+
"""
698+
snapshots = self.tbl.snapshots()
699+
700+
_all_known_files = {}
701+
_all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist())
702+
_all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots}
703+
_all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics}
704+
705+
metadata_files = {entry.metadata_file for entry in self.tbl.metadata.metadata_log}
706+
metadata_files.add(self.tbl.metadata_location) # Include current metadata file
707+
_all_known_files["metadata"] = metadata_files
708+
709+
executor = ExecutorFactory.get_or_create()
710+
snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
711+
files_by_snapshots: Iterator[Set[str]] = executor.map(
712+
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids
713+
)
714+
_all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set())
715+
716+
return _all_known_files
717+
693718
def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
694719
import pyarrow as pa
695720

@@ -715,4 +740,4 @@ def all_data_files(self) -> "pa.Table":
715740
return self._all_files({DataFileContent.DATA})
716741

717742
def all_delete_files(self) -> "pa.Table":
718-
return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
743+
return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

pyiceberg/table/maintenance.py

Lines changed: 85 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,7 @@ def expire_snapshots_older_than(self, timestamp_ms: int) -> None:
105105

106106
if snapshots_to_expire:
107107
with self.tbl.transaction() as txn:
108-
from pyiceberg.table.update import RemoveSnapshotsUpdate
109-
110-
txn._apply((RemoveSnapshotsUpdate(snapshot_ids=snapshots_to_expire),))
108+
self.expire_snapshots_by_ids(snapshots_to_expire)
111109

112110
def expire_snapshots_older_than_with_retention(
113111
self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None
@@ -125,9 +123,7 @@ def expire_snapshots_older_than_with_retention(
125123

126124
if snapshots_to_expire:
127125
with self.tbl.transaction() as txn:
128-
from pyiceberg.table.update import RemoveSnapshotsUpdate
129-
130-
txn._apply((RemoveSnapshotsUpdate(snapshot_ids=snapshots_to_expire),))
126+
self.expire_snapshots_by_ids(snapshots_to_expire)
131127

132128
def retain_last_n_snapshots(self, n: int) -> None:
133129
"""Keep only the last N snapshots, expiring all others.
@@ -163,9 +159,7 @@ def retain_last_n_snapshots(self, n: int) -> None:
163159

164160
if snapshots_to_expire:
165161
with self.tbl.transaction() as txn:
166-
from pyiceberg.table.update import RemoveSnapshotsUpdate
167-
168-
txn._apply((RemoveSnapshotsUpdate(snapshot_ids=snapshots_to_expire),))
162+
self.expire_snapshots_by_ids(snapshots_to_expire)
169163

170164
def _get_snapshots_to_expire_with_retention(
171165
self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None
@@ -298,101 +292,108 @@ def _get_protected_snapshot_ids(self, table_metadata: TableMetadata) -> Set[int]
298292
protected_ids.add(ref.snapshot_id)
299293
return protected_ids
300294

301-
def _get_all_datafiles(
302-
self,
303-
scan_all_snapshots: bool = False,
304-
target_file_path: Optional[str] = None,
305-
parallel: bool = True,
306-
) -> List[DataFile]:
307-
"""Collect all DataFiles in the table, optionally filtering by file path."""
295+
def _get_all_datafiles(self) -> List[DataFile]:
296+
"""Collect all DataFiles in the table, scanning all partitions."""
308297
datafiles: List[DataFile] = []
309298

310299
def process_manifest(manifest: ManifestFile) -> list[DataFile]:
311300
found: list[DataFile] = []
312301
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
313302
if hasattr(entry, "data_file"):
314-
df = entry.data_file
315-
if target_file_path is None or df.file_path == target_file_path:
316-
found.append(df)
303+
found.append(entry.data_file)
317304
return found
318305

319-
if scan_all_snapshots:
320-
manifests = []
321-
for snapshot in self.tbl.snapshots():
322-
manifests.extend(snapshot.manifests(io=self.tbl.io))
323-
if parallel:
324-
with ThreadPoolExecutor() as executor:
325-
results = executor.map(process_manifest, manifests)
326-
for res in results:
327-
datafiles.extend(res)
328-
else:
329-
for manifest in manifests:
330-
datafiles.extend(process_manifest(manifest))
331-
else:
332-
# Only current snapshot
333-
for chunk in self.tbl.inspect.data_files().to_pylist():
334-
file_path = chunk.get("file_path")
335-
partition: dict[str, Any] = dict(chunk.get("partition", {}) or {})
336-
if target_file_path is None or file_path == target_file_path:
337-
datafiles.append(DataFile(file_path=file_path, partition=partition))
306+
# Scan all snapshots
307+
manifests = []
308+
for snapshot in self.tbl.snapshots():
309+
manifests.extend(snapshot.manifests(io=self.tbl.io))
310+
with ThreadPoolExecutor() as executor:
311+
results = executor.map(process_manifest, manifests)
312+
for res in results:
313+
datafiles.extend(res)
314+
315+
return datafiles
316+
317+
def _get_all_datafiles_with_context(self) -> List[tuple[DataFile, str, int]]:
318+
"""Collect all DataFiles in the table, scanning all partitions, with manifest context."""
319+
datafiles: List[tuple[DataFile, str, int]] = []
320+
321+
def process_manifest(manifest: ManifestFile) -> list[tuple[DataFile, str, int]]:
322+
found: list[tuple[DataFile, str, int]] = []
323+
for idx, entry in enumerate(manifest.fetch_manifest_entry(io=self.tbl.io)):
324+
if hasattr(entry, "data_file"):
325+
found.append((entry.data_file, getattr(manifest, 'manifest_path', str(manifest)), idx))
326+
return found
327+
328+
# Scan all snapshots
329+
manifests = []
330+
for snapshot in self.tbl.snapshots():
331+
manifests.extend(snapshot.manifests(io=self.tbl.io))
332+
with ThreadPoolExecutor() as executor:
333+
results = executor.map(process_manifest, manifests)
334+
for res in results:
335+
datafiles.extend(res)
336+
338337
return datafiles
339338

340-
def deduplicate_data_files(
341-
self,
342-
scan_all_partitions: bool = True,
343-
scan_all_snapshots: bool = False,
344-
to_remove: Optional[List[Union[DataFile, str]]] = None,
345-
parallel: bool = True,
346-
) -> List[DataFile]:
339+
def _detect_duplicates(self, all_datafiles_with_context: List[tuple[DataFile, str, int]]) -> List[DataFile]:
340+
"""Detect duplicate data files based on file name and extension."""
341+
seen = {}
342+
processed_entries = set()
343+
duplicates = []
344+
345+
for df, manifest_path, entry_idx in all_datafiles_with_context:
346+
# Extract file name and extension
347+
file_name_with_extension = df.file_path.split("/")[-1]
348+
entry_key = (manifest_path, entry_idx)
349+
350+
if file_name_with_extension in seen:
351+
if entry_key not in processed_entries:
352+
duplicates.append(df)
353+
processed_entries.add(entry_key)
354+
else:
355+
seen[file_name_with_extension] = (df, manifest_path, entry_idx)
356+
357+
return duplicates
358+
359+
def deduplicate_data_files(self) -> List[DataFile]:
347360
"""
348361
Remove duplicate data files from an Iceberg table.
349362
350-
Args:
351-
scan_all_partitions: If True, scan all partitions for duplicates (uses file_path+partition as key).
352-
scan_all_snapshots: If True, scan all snapshots for duplicates, otherwise only current snapshot.
353-
to_remove: List of DataFile objects or file path strings to remove. If None, auto-detect duplicates.
354-
parallel: If True, parallelize manifest traversal.
355-
356363
Returns:
357364
List of removed DataFile objects.
358365
"""
359366
removed: List[DataFile] = []
360367

361-
# Determine what to remove
362-
if to_remove is None:
363-
# Auto-detect duplicates
364-
all_datafiles = self._get_all_datafiles(scan_all_snapshots=scan_all_snapshots, parallel=parallel)
365-
seen = {}
366-
duplicates = []
367-
for df in all_datafiles:
368-
partition: dict[str, Any] = df.partition.to_dict() if hasattr(df.partition, "to_dict") else {}
369-
if scan_all_partitions:
370-
key = (df.file_path, tuple(sorted(partition.items())) if partition else ())
371-
else:
372-
key = (df.file_path, ()) # Add an empty tuple for partition when scan_all_partitions is False
373-
if key in seen:
374-
duplicates.append(df)
375-
else:
376-
seen[key] = df
377-
to_remove = duplicates # type: ignore[assignment]
378-
379-
# Normalize to DataFile objects
380-
normalized_to_remove: List[DataFile] = []
381-
all_datafiles = self._get_all_datafiles(scan_all_snapshots=scan_all_snapshots, parallel=parallel)
382-
for item in to_remove or []:
383-
if isinstance(item, DataFile):
384-
normalized_to_remove.append(item)
385-
elif isinstance(item, str):
386-
# Remove all DataFiles with this file_path
387-
for df in all_datafiles:
388-
if df.file_path == item:
389-
normalized_to_remove.append(df)
390-
else:
391-
raise ValueError(f"Unsupported type in to_remove: {type(item)}")
368+
# Collect all data files
369+
all_datafiles_with_context = self._get_all_datafiles_with_context()
370+
371+
# Detect duplicates
372+
duplicates = self._detect_duplicates(all_datafiles_with_context)
392373

393374
# Remove the DataFiles
394-
for df in normalized_to_remove:
395-
self.tbl.transaction().update_snapshot().overwrite().delete_data_file(df).commit()
375+
for df in duplicates:
376+
self.tbl.transaction().update_snapshot().overwrite().delete_data_file(df)
396377
removed.append(df)
397378

398379
return removed
380+
381+
def _detect_duplicates(self, all_datafiles_with_context: List[tuple[DataFile, str, int]]) -> List[DataFile]:
382+
"""Detect duplicate data files based on file path and partition."""
383+
seen = {}
384+
processed_entries = set()
385+
duplicates = []
386+
387+
for df, manifest_path, entry_idx in all_datafiles_with_context:
388+
partition: dict[str, Any] = df.partition.to_dict() if hasattr(df.partition, "to_dict") else {}
389+
key = (df.file_path, tuple(sorted(partition.items())) if partition else ())
390+
entry_key = (manifest_path, entry_idx)
391+
392+
if key in seen:
393+
if entry_key not in processed_entries:
394+
duplicates.append(df)
395+
processed_entries.add(entry_key)
396+
else:
397+
seen[key] = (df, manifest_path, entry_idx)
398+
399+
return duplicates

0 commit comments

Comments
 (0)