-
Notifications
You must be signed in to change notification settings - Fork 128
Add Arrow C streaming, DataFrame iteration, and OOM-safe streaming execution #1222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 59 commits
91ccd1e
f78e90b
e322521
31e8ed1
0130a72
03e530c
4a3f17d
f7a2407
b1d18a8
eeb2a37
748b7e2
5e650aa
ebd2191
6bae74b
f0cbe06
295d04a
475c031
06c9fc7
94432b5
31ed8e7
610aed3
1ebd3c1
2e4b963
d0ee865
16a249c
d91ecfa
21f286a
831f56f
7b5e461
d6e8132
8a250a4
7789322
07a8169
9e27cc6
9dc3fb2
d3c68cc
33f9024
7553b32
b6909a5
f4e76ea
b66b441
2794c88
17c4c2c
0ff4c0d
f450e1d
5dc5cfa
fd08dc4
9baa49e
78f6c8a
5a53633
ccc8633
98ac3a1
759fb86
57d4162
d66d496
d76a509
7433234
848665e
13ebaf9
dae501d
c36aa9a
914f17e
d120b2e
e7a7ec9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -145,10 +145,118 @@ To materialize the results of your DataFrame operations: | |
|
||
# Display results | ||
df.show() # Print tabular format to console | ||
|
||
# Count rows | ||
count = df.count() | ||
|
||
Zero-copy streaming to Arrow-based Python libraries | ||
--------------------------------------------------- | ||
|
||
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling | ||
zero-copy, lazy streaming into Arrow-based Python libraries. With the streaming | ||
protocol, batches are produced on demand so you can process arbitrarily large | ||
results without out-of-memory errors. | ||
|
||
.. note:: | ||
|
||
The protocol is implementation-agnostic and works with any Python library | ||
that understands the Arrow C streaming interface (for example, PyArrow | ||
or other Arrow-compatible implementations). The sections below provide a | ||
short PyArrow-specific example and general guidance for other | ||
implementations. | ||
|
||
PyArrow | ||
------- | ||
|
||
.. code-block:: python | ||
|
||
import pyarrow as pa | ||
|
||
# Create a PyArrow RecordBatchReader without materializing all batches | ||
reader = pa.RecordBatchReader.from_stream(df) | ||
for batch in reader: | ||
... # process each batch as it is produced | ||
|
||
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch` | ||
objects lazily so you can loop over results directly without importing | ||
PyArrow:: | ||
|
||
.. code-block:: python | ||
|
||
for batch in df: | ||
... # each batch is a ``datafusion.RecordBatch`` | ||
|
||
Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow | ||
table. ``pa.table(df)`` collects the entire DataFrame eagerly into a | ||
PyArrow table:: | ||
|
||
.. code-block:: python | ||
|
||
import pyarrow as pa | ||
table = pa.table(df) | ||
|
||
Asynchronous iteration is supported as well, allowing integration with | ||
``asyncio`` event loops:: | ||
|
||
.. code-block:: python | ||
|
||
async for batch in df: | ||
... # process each batch as it is produced | ||
|
||
To work with the stream directly, use ``execute_stream()``, which returns a | ||
:class:`~datafusion.RecordBatchStream`:: | ||
|
||
.. code-block:: python | ||
|
||
stream = df.execute_stream() | ||
for batch in stream: | ||
... | ||
|
||
Execute as Stream | ||
^^^^^^^^^^^^^^^^^ | ||
|
||
For finer control over streaming execution, use | ||
:py:meth:`~datafusion.DataFrame.execute_stream` to obtain a | ||
:py:class:`datafusion.RecordBatchStream`: | ||
|
||
.. code-block:: python | ||
|
||
stream = df.execute_stream() | ||
for batch in stream: | ||
... # process each batch as it is produced | ||
|
||
.. tip:: | ||
|
||
To get a PyArrow reader instead, call | ||
``pa.RecordBatchReader.from_stream(df)``. | ||
|
||
When partition boundaries are important, | ||
:py:meth:`~datafusion.DataFrame.execute_stream_partitioned` | ||
returns an iterable of :py:class:`datafusion.RecordBatchStream` objects, one per | ||
partition: | ||
|
||
.. code-block:: python | ||
|
||
for stream in df.execute_stream_partitioned(): | ||
for batch in stream: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting. Can these streams be polled concurrently? Can you do streams = list(df.execute_stream_partitioned()) and then concurrently iterate over all the streams, yielding whatever batch comes in first? I suppose that would just do in Python what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question!
|
||
... # each stream yields RecordBatches | ||
|
||
To process partitions concurrently, first collect the streams into a list | ||
and then poll each one in a separate ``asyncio`` task: | ||
|
||
.. code-block:: python | ||
|
||
import asyncio | ||
|
||
async def consume(stream): | ||
async for batch in stream: | ||
... | ||
|
||
streams = list(df.execute_stream_partitioned()) | ||
await asyncio.gather(*(consume(s) for s in streams)) | ||
|
||
See :doc:`../io/arrow` for additional details on the Arrow interface. | ||
|
||
HTML Rendering | ||
-------------- | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,9 @@ | |
from typing import ( | ||
TYPE_CHECKING, | ||
Any, | ||
AsyncIterator, | ||
Iterable, | ||
Iterator, | ||
Literal, | ||
Optional, | ||
Union, | ||
|
@@ -42,7 +44,7 @@ | |
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal | ||
from datafusion.expr import Expr, SortExpr, sort_or_default | ||
from datafusion.plan import ExecutionPlan, LogicalPlan | ||
from datafusion.record_batch import RecordBatchStream | ||
from datafusion.record_batch import RecordBatch, RecordBatchStream | ||
|
||
if TYPE_CHECKING: | ||
import pathlib | ||
|
@@ -296,6 +298,9 @@ def __init__( | |
class DataFrame: | ||
"""Two dimensional table representation of data. | ||
|
||
DataFrame objects are iterable; iterating over a DataFrame yields | ||
:class:`datafusion.RecordBatch` instances lazily. | ||
|
||
See :ref:`user_guide_concepts` in the online documentation for more information. | ||
""" | ||
|
||
|
@@ -312,7 +317,7 @@ def into_view(self) -> pa.Table: | |
return self.df.into_view() | ||
|
||
def __getitem__(self, key: str | list[str]) -> DataFrame: | ||
"""Return a new :py:class`DataFrame` with the specified column or columns. | ||
"""Return a new :py:class:`DataFrame` with the specified column or columns. | ||
|
||
Args: | ||
key: Column name or list of column names to select. | ||
|
@@ -1105,21 +1110,55 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram | |
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls)) | ||
|
||
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: | ||
"""Export an Arrow PyCapsule Stream. | ||
"""Export the DataFrame as an Arrow C Stream. | ||
|
||
The DataFrame is executed using DataFusion's streaming APIs and exposed via | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be good to have a link somewhere in the docstring to https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
Arrow's C Stream interface. Record batches are produced incrementally, so the | ||
full result set is never materialized in memory. | ||
|
||
This will execute and collect the DataFrame. We will attempt to respect the | ||
requested schema, but only trivial transformations will be applied such as only | ||
returning the fields listed in the requested schema if their data types match | ||
those in the DataFrame. | ||
When ``requested_schema`` is provided, DataFusion applies only simple | ||
projections such as selecting a subset of existing columns or reordering | ||
them. Column renaming, computed expressions, or type coercion are not | ||
supported through this interface. | ||
|
||
Args: | ||
requested_schema: Attempt to provide the DataFrame using this schema. | ||
requested_schema: Either a :py:class:`pyarrow.Schema` or an Arrow C | ||
Schema capsule (``PyCapsule``) produced by | ||
``schema._export_to_c_capsule()``. The DataFrame will attempt to | ||
align its output with the fields and order specified by this schema. | ||
|
||
Returns: | ||
Arrow PyCapsule object. | ||
Arrow ``PyCapsule`` object representing an ``ArrowArrayStream``. | ||
|
||
Examples: | ||
>>> schema = df.schema() | ||
>>> stream = df.__arrow_c_stream__(schema) | ||
>>> capsule = schema._export_to_c_capsule() | ||
>>> stream = df.__arrow_c_stream__(capsule) | ||
|
||
Notes: | ||
The Arrow C Data Interface PyCapsule details are documented by Apache | ||
Arrow and can be found at: | ||
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html | ||
""" | ||
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages | ||
# ``execute_stream_partitioned`` under the hood to stream batches while | ||
# preserving the original partition order. | ||
return self.df.__arrow_c_stream__(requested_schema) | ||
|
||
def __iter__(self) -> Iterator[RecordBatch]: | ||
"""Return an iterator over this DataFrame's record batches.""" | ||
return iter(self.execute_stream()) | ||
|
||
def __aiter__(self) -> AsyncIterator[RecordBatch]: | ||
"""Return an async iterator over this DataFrame's record batches. | ||
|
||
`RecordBatchStream` implements ``__aiter__``, so return the stream | ||
directly to remain compatible with Python < 3.10 (this project | ||
supports Python >= 3.6). | ||
|
||
""" | ||
return self.execute_stream() | ||
|
||
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame: | ||
"""Apply a function to the current DataFrame which returns another DataFrame. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,26 @@ def to_pyarrow(self) -> pa.RecordBatch: | |
"""Convert to :py:class:`pa.RecordBatch`.""" | ||
return self.record_batch.to_pyarrow() | ||
|
||
def __arrow_c_array__( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 👍 |
||
self, requested_schema: object | None = None | ||
) -> tuple[object, object]: | ||
"""Export the record batch via the Arrow C Data Interface. | ||
|
||
This allows zero-copy interchange with libraries that support the | ||
`Arrow PyCapsule interface <https://arrow.apache.org/docs/format/ | ||
CDataInterface/PyCapsuleInterface.html>`_. | ||
|
||
Args: | ||
requested_schema: Attempt to provide the record batch using this | ||
schema. Only straightforward projections such as column | ||
selection or reordering are applied. | ||
|
||
Returns: | ||
Two Arrow PyCapsule objects representing the ``ArrowArray`` and | ||
``ArrowSchema``. | ||
""" | ||
return self.record_batch.__arrow_c_array__(requested_schema) | ||
|
||
|
||
class RecordBatchStream: | ||
"""This class represents a stream of record batches. | ||
|
@@ -63,19 +83,19 @@ def next(self) -> RecordBatch: | |
return next(self) | ||
|
||
async def __anext__(self) -> RecordBatch: | ||
"""Async iterator function.""" | ||
"""Return the next :py:class:`RecordBatch` in the stream asynchronously.""" | ||
next_batch = await self.rbs.__anext__() | ||
return RecordBatch(next_batch) | ||
|
||
def __next__(self) -> RecordBatch: | ||
"""Iterator function.""" | ||
"""Return the next :py:class:`RecordBatch` in the stream.""" | ||
next_batch = next(self.rbs) | ||
return RecordBatch(next_batch) | ||
|
||
def __aiter__(self) -> typing_extensions.Self: | ||
"""Async iterator function.""" | ||
"""Return an asynchronous iterator over record batches.""" | ||
return self | ||
|
||
def __iter__(self) -> typing_extensions.Self: | ||
"""Iterator function.""" | ||
"""Return an iterator over record batches.""" | ||
return self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are most of the changes here just formatting? Did you just add
cstr
?