Skip to content

Commit 6eb0895

Browse files
committed
move unblocking logic to queue_io
1 parent ec0201f commit 6eb0895

File tree

2 files changed

+8
-9
lines changed

2 files changed

+8
-9
lines changed

src/pyper/_core/async_helper/queue_io.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from collections.abc import AsyncIterable, Iterable
45
from typing import TYPE_CHECKING
56

@@ -65,9 +66,11 @@ async def __call__(self, *args, **kwargs):
6566
if isinstance(result, AsyncIterable):
6667
async for output in result:
6768
await self.q_out.put(output)
69+
await asyncio.sleep(0)
6870
elif isinstance(result := await result, Iterable):
6971
for output in result:
7072
await self.q_out.put(output)
73+
await asyncio.sleep(0)
7174
else:
7275
raise TypeError(f"got object of type {type(result)} from branching task {self.task.func} which could not be iterated over"
7376
" (the task should be a generator, or return an iterable)")

src/pyper/_core/util/asynchronize.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,18 @@
1010
def ascynchronize(task: Task, tp: ThreadPoolExecutor, pp: ProcessPoolExecutor) -> Task:
1111
"""Unify async and sync tasks as awaitable futures.
1212
1. If the task is async already, return it.
13-
2. Synchronous generators are transformed into asynchronous generators.
14-
3. Multiprocessed synchronous functions are wrapped in a call to `run_in_executor` using `ProcessPoolExecutor`.
15-
4. Threaded synchronous functions are wrapped in a call to `run_in_executor` using `ThreadPoolExecutor`.
13+
2. Multiprocessed synchronous functions are wrapped in a call to `run_in_executor` using `ProcessPoolExecutor`.
14+
3. Threaded synchronous functions are wrapped in a call to `run_in_executor` using `ThreadPoolExecutor`.
1615
"""
1716
if task.is_async:
1817
return task
1918

20-
if task.is_gen and task.branch:
21-
# Small optimization to convert sync generators to async generators
19+
if task.is_gen:
20+
# Small optimization to convert sync generators to async functions
2221
# This saves from having to use a thread/process just to get the generator object
23-
# We also add asyncio.sleep(0) to unblock long synchronous generators
2422
@functools.wraps(task.func)
2523
async def wrapper(*args, **kwargs):
26-
for output in task.func(*args, **kwargs):
27-
yield output
28-
await asyncio.sleep(0)
24+
return task.func(*args, **kwargs)
2925
else:
3026
executor = pp if task.multiprocess else tp
3127
@functools.wraps(task.func)

0 commit comments

Comments
 (0)