Skip to content

Commit 3bb34b8

Browse files
committed
Plugged rx Disposables into async iterators
So we can stop a subscribed observable and therefore stop the associated async iterator
1 parent 1f57876 commit 3bb34b8

File tree

1 file changed

+119
-2
lines changed

1 file changed

+119
-2
lines changed

graphql/execution/executors/asyncio_utils.py

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,129 @@
11
from inspect import isasyncgen
22
from asyncio import ensure_future
3-
from rx import Observable
3+
from rx import Observable, AnonymousObserver
4+
from rx.core import ObservableBase, Disposable, ObserverBase
5+
6+
from rx.concurrency import current_thread_scheduler
7+
8+
from rx.core import Observer, Observable, Disposable
9+
from rx.core.anonymousobserver import AnonymousObserver
10+
from rx.core.autodetachobserver import AutoDetachObserver
11+
12+
13+
# class AsyncgenDisposable(Disposable):
14+
# """Represents a Disposable that disposes the asyncgen automatically."""
15+
16+
# def __init__(self, asyncgen):
17+
# """Initializes a new instance of the AsyncgenDisposable class."""
18+
19+
# self.asyncgen = asyncgen
20+
# self.is_disposed = False
21+
22+
# super(AsyncgenDisposable, self).__init__()
23+
24+
# def dispose(self):
25+
# """Sets the status to disposed"""
26+
# self.asyncgen.aclose()
27+
# self.is_disposed = True
28+
29+
30+
class AsyncgenObserver(AutoDetachObserver):
31+
def __init__(self, asyncgen, *args, **kwargs):
32+
self._asyncgen = asyncgen
33+
self.is_disposed = False
34+
super(AsyncgenObserver, self).__init__(*args, **kwargs)
35+
36+
async def dispose_asyncgen(self):
37+
if self.is_disposed:
38+
return
39+
40+
try:
41+
# await self._asyncgen.aclose()
42+
await self._asyncgen.athrow(StopAsyncIteration)
43+
self.is_disposed = True
44+
except:
45+
pass
46+
47+
def dispose(self):
48+
if self.is_disposed:
49+
return
50+
disposed = super(AsyncgenObserver, self).dispose()
51+
# print("DISPOSE observer!", disposed)
52+
ensure_future(self.dispose_asyncgen())
53+
54+
55+
class AsyncgenObservable(ObservableBase):
56+
"""Class to create an Observable instance from a delegate-based
57+
implementation of the Subscribe method."""
58+
59+
def __init__(self, subscribe, asyncgen):
60+
"""Creates an observable sequence object from the specified
61+
subscription function.
62+
63+
Keyword arguments:
64+
:param types.FunctionType subscribe: Subscribe method implementation.
65+
"""
66+
67+
self._subscribe = subscribe
68+
self._asyncgen = asyncgen
69+
super(AsyncgenObservable, self).__init__()
70+
71+
def _subscribe_core(self, observer):
72+
# print("GET SUBSCRIBER", observer)
73+
return self._subscribe(observer)
74+
# print("SUBSCRIBER RESULT", subscriber)
75+
# return subscriber
76+
77+
def subscribe(self, on_next=None, on_error=None, on_completed=None, observer=None):
78+
79+
if isinstance(on_next, Observer):
80+
observer = on_next
81+
elif hasattr(on_next, "on_next") and callable(on_next.on_next):
82+
observer = on_next
83+
elif not observer:
84+
observer = AnonymousObserver(on_next, on_error, on_completed)
85+
86+
auto_detach_observer = AsyncgenObserver(self._asyncgen, observer)
87+
88+
def fix_subscriber(subscriber):
89+
"""Fixes subscriber to make sure it returns a Disposable instead
90+
of None or a dispose function"""
91+
92+
if not hasattr(subscriber, "dispose"):
93+
subscriber = Disposable.create(subscriber)
94+
95+
return subscriber
96+
97+
def set_disposable(scheduler=None, value=None):
98+
try:
99+
subscriber = self._subscribe_core(auto_detach_observer)
100+
except Exception as ex:
101+
if not auto_detach_observer.fail(ex):
102+
raise
103+
else:
104+
auto_detach_observer.disposable = fix_subscriber(subscriber)
105+
106+
# Subscribe needs to set up the trampoline before for subscribing.
107+
# Actually, the first call to Subscribe creates the trampoline so
108+
# that it may assign its disposable before any observer executes
109+
# OnNext over the CurrentThreadScheduler. This enables single-
110+
# threaded cancellation
111+
# https://social.msdn.microsoft.com/Forums/en-US/eb82f593-9684-4e27-
112+
# 97b9-8b8886da5c33/whats-the-rationale-behind-how-currentthreadsche
113+
# dulerschedulerequired-behaves?forum=rx
114+
if current_thread_scheduler.schedule_required():
115+
current_thread_scheduler.schedule(set_disposable)
116+
else:
117+
set_disposable()
118+
119+
# Hide the identity of the auto detach observer
120+
return Disposable.create(auto_detach_observer.dispose)
4121

5122

6123
def asyncgen_to_observable(asyncgen):
7124
def emit(observer):
8125
ensure_future(iterate_asyncgen(asyncgen, observer))
9-
return Observable.create(emit)
126+
return AsyncgenObservable(emit, asyncgen)
10127

11128

12129
async def iterate_asyncgen(asyncgen, observer):

0 commit comments

Comments
 (0)