Skip to content

Commit cb25580

Browse files
committed
Added generator to observable
1 parent cc90ad0 commit cb25580

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

graphql/execution/executors/asyncio.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,15 @@ def ensure_future(coro_or_future, loop=None):
2525
del task._source_traceback[-1]
2626
return task
2727
else:
28-
raise TypeError('A Future, a coroutine or an awaitable is required')
28+
raise TypeError(
29+
'A Future, a coroutine or an awaitable is required')
30+
31+
try:
32+
from .asyncio_utils import asyncgen_to_observable, isasyncgen
33+
except:
34+
def isasyncgen(obj): False
35+
36+
def asyncgen_to_observable(asyncgen): pass
2937

3038

3139
class AsyncioExecutor(object):
@@ -50,4 +58,6 @@ def execute(self, fn, *args, **kwargs):
5058
future = ensure_future(result, loop=self.loop)
5159
self.futures.append(future)
5260
return Promise.resolve(future)
61+
elif isasyncgen(result):
62+
return asyncgen_to_observable(result)
5363
return result
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from inspect import isasyncgen
2+
from asyncio import ensure_future
3+
from rx import Observable
4+
5+
6+
def asyncgen_to_observable(asyncgen):
7+
def emit(observer):
8+
ensure_future(iterate_asyncgen(asyncgen, observer))
9+
return Observable.create(emit)
10+
11+
12+
async def iterate_asyncgen(asyncgen, observer):
13+
try:
14+
async for item in asyncgen:
15+
observer.on_next(item)
16+
observer.on_completed()
17+
except Exception as e:
18+
observer.on_error(e)

0 commit comments

Comments
 (0)