Skip to content

Commit 70e1ee4

Browse files
committed
Backpressure and ordering in sync gen transform
1 parent ed56cd8 commit 70e1ee4

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

async_utils/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@
1313
# limitations under the License.
1414

1515

16-
__version__ = "7.0.4"
16+
__version__ = "2024.11.22"

async_utils/gen_transform.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ def _consumer(
3232
**kwargs: P.kwargs,
3333
) -> None:
3434
for val in f(*args, **kwargs):
35-
loop.call_soon_threadsafe(queue.put_nowait, val)
35+
# This ensures a strict ordering on other event loops
36+
# uvloop in particular caused this to be needed
37+
h = asyncio.run_coroutine_threadsafe(queue.put(val), loop)
38+
h.result()
3639

3740

3841
def sync_to_async_gen(
@@ -54,7 +57,11 @@ def sync_to_async_gen(
5457
If your generator is actually a synchronous coroutine, that's super cool,
5558
but rewrite is as a native coroutine or use it directly then, you don't need
5659
what this function does."""
57-
q: asyncio.Queue[YieldType] = asyncio.Queue()
60+
# Provides backpressure, ensuring the underlying sync generator in a thread is lazy
61+
# If the user doesn't want laziness, then using this method makes little sense, they could
62+
# trivially exhaust the generator in a thread with asyncio.to_thread(lambda g: list(g()), g)
63+
# to then use the values
64+
q: asyncio.Queue[YieldType] = asyncio.Queue(maxsize=1)
5865

5966
background_coro = asyncio.to_thread(_consumer, asyncio.get_running_loop(), q, f, *args, **kwargs)
6067
background_task = asyncio.create_task(background_coro)

0 commit comments

Comments
 (0)