Skip to content

Conversation

@tom-at-rewbi
Copy link

@tom-at-rewbi tom-at-rewbi commented Oct 31, 2025

Rationale for this change

My expectation for to_record_batches was that it would yield batches and not materialize an entire parquet file in memory, but it looks like the current implementation explicitly does this.

This change makes to_record_batches iterate batches lazily.

Are these changes tested?

I ran make test and all tests completed successfully except those that import kerberos (2 of them), as I do not have it installed and it does not seem to build for me at the moment.

As for testing that this reduces memory usage, this change made my data pipeline stop OOM'ing.

Are there any user-facing changes?

There should not be any user-facing changes.

Comment on lines -1729 to -1735
executor = ExecutorFactory.get_or_create()

def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]:
# Materialize the iterator here to ensure execution happens within the executor.
# Otherwise, the iterator would be lazily consumed later (in the main thread),
# defeating the purpose of using executor.map.
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic was added in #1995 to parallelize the work using the executor.

My expectation for to_record_batches was that it would yield batches and not materialize an entire parquet file in memory, but it looks like the current implementation explicitly does this.

that would be my expectation as well. The current implementation reads an entire parquet file.

BUT I do want to still keep the ability to parallelize reading...
lets see if there's a better way to refactor this

Copy link
Author

@tom-at-rewbi tom-at-rewbi Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, reading is still parallel after this change, albeit in a different way. Every time a scan task is processed, pyarrow.dataset.Scanner.from_fragment is called with the default arguments for use_threads, which has a default of maximum parallelism. (link to source)

pyarrow.dataset.Scanner also has configurable options for readahead (also enabled by default), which I assume are parallelized, but I am not sure.

Are there any situations where we would expect stacking these two forms of parallelism to be beneficial?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants