Skip to content

Conversation

@kylebarron
Copy link
Member

Which issue does this PR close?

Closes #974 .

Rationale for this change

Support async iteration of RecordBatchStream.

What changes are included in this PR?

  • Move into Rust raising StopIteration or AsyncStopIteration errors
  • Update typing
  • Add dependency on pyo3-async-runtimes to manage future conversion.
  • Move SendableRecordBatchStream into an Arc<Mutex<>>. This is required I think so that we can clone the stream into future_into_py

Are there any user-facing changes?

Adds async iterator support.

@kylebarron
Copy link
Member Author

If this looks good, we can add a test using pytest-asyncio

@timsaucer
Copy link
Member

At first glance, this looks like a very nice add. Do you need help resolving the test failures?

@kylebarron
Copy link
Member Author

This PR changes the behavior of stream.next() to raise StopIteration when there are no more batches available in the stream.

This matches default iterator behavior:

image

This also matches pyarrow.RecordBatchReader behavior

In [11]: import pyarrow as pa

In [12]: batch = pa.record_batch([])

In [13]: batch2 = pa.record_batch([])

In [15]: reader = pa.RecordBatchReader.from_batches(batch.schema, [batch, batch2])

In [16]: reader.read_next_batch()
Out[16]:
pyarrow.RecordBatch

----

In [17]: reader.read_next_batch()
Out[17]:
pyarrow.RecordBatch

----

In [18]: reader.read_next_batch()
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
Cell In[18], line 1
----> 1 reader.read_next_batch()

File ~/.pyenv/versions/3.11.8/lib/python3.11/site-packages/pyarrow/ipc.pxi:708, in pyarrow.lib.RecordBatchReader.read_next_batch()

StopIteration:

@kylebarron
Copy link
Member Author

I'd like to add a test with pytest-asyncio, but I don't know how to add the dependency, ref #977

Copy link
Member

@timsaucer timsaucer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I've had a chance to actually review, this looks great. Can we add an issue to track the unit test for pytest-asyncio after the update to uv for package management?

@timsaucer timsaucer merged commit 4b262be into apache:main Jan 9, 2025
23 checks passed
@kylebarron
Copy link
Member Author

Thanks for making that update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add async iterator support to RecordBatchStream

2 participants