-
Notifications
You must be signed in to change notification settings - Fork 343
Add delete file index to pyiceberg and support equality delete reads #2255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -45,6 +45,7 @@ | |||||
Any, | ||||||
Callable, | ||||||
Dict, | ||||||
FrozenSet, | ||||||
Generic, | ||||||
Iterable, | ||||||
Iterator, | ||||||
|
@@ -978,18 +979,23 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi | |||||
raise ValueError(f"Unsupported file format: {file_format}") | ||||||
|
||||||
|
||||||
def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: | ||||||
def _read_deletes(io: FileIO, data_file: DataFile) -> Union[Dict[str, pa.ChunkedArray], pa.Table]: | ||||||
if data_file.file_format == FileFormat.PARQUET: | ||||||
with io.new_input(data_file.file_path).open() as fi: | ||||||
delete_fragment = _get_file_format( | ||||||
data_file.file_format, dictionary_columns=("file_path",), pre_buffer=True, buffer_size=ONE_MEGABYTE | ||||||
).make_fragment(fi) | ||||||
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() | ||||||
table = table.unify_dictionaries() | ||||||
return { | ||||||
file.as_py(): table.filter(pc.field("file_path") == file).column("pos") | ||||||
for file in table.column("file_path").chunks[0].dictionary | ||||||
} | ||||||
if data_file.content == DataFileContent.POSITION_DELETES: | ||||||
table = table.unify_dictionaries() | ||||||
return { | ||||||
file.as_py(): table.filter(pc.field("file_path") == file).column("pos") | ||||||
for file in table.column("file_path").chunks[0].dictionary | ||||||
} | ||||||
elif data_file.content == DataFileContent.EQUALITY_DELETES: | ||||||
return table | ||||||
else: | ||||||
raise ValueError(f"Unsupported delete file content: {data_file.content}") | ||||||
elif data_file.file_format == FileFormat.PUFFIN: | ||||||
with io.new_input(data_file.file_path).open() as fi: | ||||||
payload = fi.read() | ||||||
|
@@ -1445,7 +1451,7 @@ def _task_to_record_batches( | |||||
bound_row_filter: BooleanExpression, | ||||||
projected_schema: Schema, | ||||||
projected_field_ids: Set[int], | ||||||
positional_deletes: Optional[List[ChunkedArray]], | ||||||
deletes: Optional[List[Union[pa.ChunkedArray, pa.Table]]], | ||||||
case_sensitive: bool, | ||||||
name_mapping: Optional[NameMapping] = None, | ||||||
partition_spec: Optional[PartitionSpec] = None, | ||||||
|
@@ -1479,9 +1485,20 @@ def _task_to_record_batches( | |||||
schema=physical_schema, | ||||||
# This will push down the query to Arrow. | ||||||
# But in case there are positional deletes, we have to apply them first | ||||||
filter=pyarrow_filter if not positional_deletes else None, | ||||||
filter=pyarrow_filter if not deletes else None, | ||||||
columns=[col.name for col in file_project_schema.columns], | ||||||
) | ||||||
positional_deletes = None | ||||||
equality_delete_groups = None | ||||||
if deletes: | ||||||
positional_deletes = [d for d in deletes if isinstance(d, pa.ChunkedArray)] | ||||||
equality_deletes = [d for d in deletes if isinstance(d, pa.Table)] | ||||||
|
||||||
# preprocess equality deletes to be applied | ||||||
if equality_deletes: | ||||||
task_eq_files = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES] | ||||||
# concatenate equality delete tables with same set of field ids to reduce anti joins | ||||||
equality_delete_groups = _group_deletes_by_equality_ids(task_eq_files, equality_deletes) | ||||||
|
||||||
next_index = 0 | ||||||
batches = fragment_scanner.to_batches() | ||||||
|
@@ -1499,6 +1516,17 @@ def _task_to_record_batches( | |||||
if current_batch.num_rows == 0: | ||||||
continue | ||||||
|
||||||
if equality_delete_groups: | ||||||
table = pa.Table.from_batches([current_batch]) | ||||||
for equality_ids, combined_table in equality_delete_groups.items(): | ||||||
table = _apply_equality_deletes(table, combined_table, list(equality_ids), file_schema) | ||||||
if table.num_rows == 0: | ||||||
break | ||||||
if table.num_rows > 0: | ||||||
current_batch = table.combine_chunks().to_batches()[0] | ||||||
else: | ||||||
continue | ||||||
|
||||||
# Apply the user filter | ||||||
if pyarrow_filter is not None: | ||||||
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 ) | ||||||
|
@@ -1528,22 +1556,64 @@ def _task_to_record_batches( | |||||
yield result_batch | ||||||
|
||||||
|
||||||
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: | ||||||
deletes_per_file: Dict[str, List[ChunkedArray]] = {} | ||||||
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) | ||||||
if len(unique_deletes) > 0: | ||||||
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Union[Dict[str, pa.ChunkedArray], pa.Table]: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be:
Suggested change
|
||||||
deletes_per_file: Dict[str, List[Union[pa.ChunkedArray, pa.Table]]] = {} | ||||||
|
||||||
positional_deletes = { | ||||||
df | ||||||
for task in tasks | ||||||
for df in task.delete_files | ||||||
if df.content == DataFileContent.POSITION_DELETES and df.file_format != FileFormat.PUFFIN | ||||||
} | ||||||
if positional_deletes: | ||||||
executor = ExecutorFactory.get_or_create() | ||||||
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map( | ||||||
lambda args: _read_deletes(*args), | ||||||
[(io, delete_file) for delete_file in unique_deletes], | ||||||
[(io, delete_file) for delete_file in positional_deletes], | ||||||
) | ||||||
for delete in deletes_per_files: | ||||||
for file, arr in delete.items(): | ||||||
if file in deletes_per_file: | ||||||
deletes_per_file[file].append(arr) | ||||||
else: | ||||||
deletes_per_file[file] = [arr] | ||||||
deletion_vectors = { | ||||||
df | ||||||
for task in tasks | ||||||
for df in task.delete_files | ||||||
if df.content == DataFileContent.POSITION_DELETES and df.file_format == FileFormat.PUFFIN | ||||||
} | ||||||
if deletion_vectors: | ||||||
executor = ExecutorFactory.get_or_create() | ||||||
dv_results: Iterator[Dict[str, ChunkedArray]] = executor.map( | ||||||
lambda args: _read_deletes(*args), | ||||||
[(io, delete_file) for delete_file in deletion_vectors], | ||||||
) | ||||||
for delete in dv_results: | ||||||
for file, arr in delete.items(): | ||||||
# Deletion vectors replace all position deletes for a file | ||||||
deletes_per_file[file] = [arr] | ||||||
|
||||||
equality_delete_tasks = [] | ||||||
for task in tasks: | ||||||
equality_deletes = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES] | ||||||
if equality_deletes: | ||||||
for delete_file in equality_deletes: | ||||||
# create a group of datafile to associated equality delete | ||||||
equality_delete_tasks.append((task.file.file_path, delete_file)) | ||||||
|
||||||
if equality_delete_tasks: | ||||||
executor = ExecutorFactory.get_or_create() | ||||||
# Processing equality delete tasks in parallel like position deletes | ||||||
equality_delete_results = executor.map( | ||||||
lambda args: (args[0], _read_deletes(io, args[1])), | ||||||
equality_delete_tasks, | ||||||
) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are already getting a subset of the files that have equality deletes, so it would make sense to use a different function to read the deletes, than using the convoluted function |
||||||
|
||||||
for file_path, equality_delete_table in equality_delete_results: | ||||||
if file_path not in deletes_per_file: | ||||||
deletes_per_file[file_path] = [] | ||||||
deletes_per_file[file_path].append(equality_delete_table) | ||||||
return deletes_per_file | ||||||
|
||||||
|
||||||
|
@@ -1679,7 +1749,7 @@ def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]: | |||||
break | ||||||
|
||||||
def _record_batches_from_scan_tasks_and_deletes( | ||||||
self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]] | ||||||
self, tasks: Iterable[FileScanTask], deletes_per_file: Union[Dict[str, pa.ChunkedArray], pa.Table] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
) -> Iterator[pa.RecordBatch]: | ||||||
total_row_count = 0 | ||||||
for task in tasks: | ||||||
|
@@ -2799,3 +2869,52 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar | |||||
field_array = arrow_table[path_parts[0]] | ||||||
# Navigate into the struct using the remaining path parts | ||||||
return pc.struct_field(field_array, path_parts[1:]) | ||||||
|
||||||
|
||||||
def _group_deletes_by_equality_ids( | ||||||
task_eq_files: List[DataFile], equality_delete_tables: List[pa.Table] | ||||||
) -> dict[FrozenSet[int], pa.Table]: | ||||||
"""Concatenate equality delete tables by their equality IDs to reduce number of anti joins.""" | ||||||
from collections import defaultdict | ||||||
|
||||||
equality_delete_groups: Dict[FrozenSet[int], pa.Table] = {} | ||||||
group_map = defaultdict(list) | ||||||
|
||||||
# Group the tables by their equality IDs | ||||||
for delete_file, delete_table in zip(task_eq_files, equality_delete_tables): | ||||||
if delete_file.equality_ids is not None: | ||||||
key = frozenset(delete_file.equality_ids) | ||||||
group_map[key].append(delete_table) | ||||||
|
||||||
# Concat arrow tables in the same groups | ||||||
for equality_ids, delete_tables in group_map.items(): | ||||||
if delete_tables: | ||||||
equality_delete_groups[equality_ids] = pa.concat_tables(delete_tables) if len(delete_tables) > 1 else delete_tables[0] | ||||||
return equality_delete_groups | ||||||
|
||||||
|
||||||
def _apply_equality_deletes( | ||||||
data_table: pa.Table, delete_table: pa.Table, equality_ids: List[int], data_schema: Optional[Schema] | ||||||
) -> pa.Table: | ||||||
"""Apply equality deletes to a data table. | ||||||
|
||||||
Filter out rows from the table that match the equality delete table the conditions in it. | ||||||
Args: | ||||||
data_table: A PyArrow table which has data to filter | ||||||
delete_table: A PyArrow table containing the equality deletes | ||||||
equality_ids: A List of field IDs to use for equality comparison | ||||||
data_schema: The schema of the PyArrow table | ||||||
Returns: | ||||||
A filtered PyArrow table with matching rows removed | ||||||
""" | ||||||
if len(delete_table) == 0: | ||||||
return data_table | ||||||
if data_schema is None: | ||||||
raise ValueError("Schema is required for applying equality deletes") | ||||||
|
||||||
# Resolve the correct columns to be used in the anti join | ||||||
equality_columns = [data_schema.find_field(fid).name for fid in equality_ids] | ||||||
|
||||||
# Use PyArrow's join function with left anti join type | ||||||
result = data_table.join(delete_table.select(equality_columns), keys=equality_columns, join_type="left anti") | ||||||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the output signature and the role of this function is convoluted.
Would it make sense to have two separate functions instead?