Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1726,16 +1726,10 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
deletes_per_file = _read_all_delete_files(self._io, tasks)

total_row_count = 0
executor = ExecutorFactory.get_or_create()

def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]:
# Materialize the iterator here to ensure execution happens within the executor.
# Otherwise, the iterator would be lazily consumed later (in the main thread),
# defeating the purpose of using executor.map.
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
Comment on lines -1729 to -1735
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic was added in #1995 to parallelize the work using the executor.

My expectation for to_record_batches was that it would yield batches and not materialize an entire parquet file in memory, but it looks like the current implementation explicitly does this.

that would be my expectation as well. The current implementation reads an entire parquet file.

BUT I do want to still keep the ability to parallelize reading...
lets see if there's a better way to refactor this

Copy link
Author

@tom-at-rewbi tom-at-rewbi Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, reading is still parallel after this change, albeit in a different way. Every time a scan task is processed, pyarrow.dataset.Scanner.from_fragment is called with the default arguments for use_threads, which has a default of maximum parallelism. (link to source)

pyarrow.dataset.Scanner also has configurable options for readahead (also enabled by default), which I assume are parallelized, but I am not sure.

Are there any situations where we would expect stacking these two forms of parallelism to be beneficial?


limit_reached = False
for batches in executor.map(batches_for_task, tasks):
for task in tasks:
batches = self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)
for batch in batches:
current_batch_size = len(batch)
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
Expand Down