|
1 |
| -from asyncio import ensure_future |
| 1 | +from asyncio import ensure_future, wait, CancelledError |
2 | 2 | from inspect import isasyncgen
|
3 | 3 |
|
4 | 4 | from rx import AnonymousObserver, Observable
|
|
7 | 7 | ObserverBase)
|
8 | 8 | from rx.core.anonymousobserver import AnonymousObserver
|
9 | 9 | from rx.core.autodetachobserver import AutoDetachObserver
|
| 10 | +from rx import AnonymousObservable |
10 | 11 |
|
11 | 12 |
|
12 | 13 | # class AsyncgenDisposable(Disposable):
|
@@ -102,33 +103,24 @@ def set_disposable(scheduler=None, value=None):
|
102 | 103 | else:
|
103 | 104 | auto_detach_observer.disposable = fix_subscriber(subscriber)
|
104 | 105 |
|
105 |
| - # Subscribe needs to set up the trampoline before for subscribing. |
106 |
| - # Actually, the first call to Subscribe creates the trampoline so |
107 |
| - # that it may assign its disposable before any observer executes |
108 |
| - # OnNext over the CurrentThreadScheduler. This enables single- |
109 |
| - # threaded cancellation |
110 |
| - # https://social.msdn.microsoft.com/Forums/en-US/eb82f593-9684-4e27- |
111 |
| - # 97b9-8b8886da5c33/whats-the-rationale-behind-how-currentthreadsche |
112 |
| - # dulerschedulerequired-behaves?forum=rx |
113 |
| - if current_thread_scheduler.schedule_required(): |
114 |
| - current_thread_scheduler.schedule(set_disposable) |
115 |
| - else: |
116 |
| - set_disposable() |
| 106 | + def dispose(): |
| 107 | + async def await_task(): |
| 108 | + await task |
117 | 109 |
|
118 |
| - # Hide the identity of the auto detach observer |
119 |
| - return Disposable.create(auto_detach_observer.dispose) |
| 110 | + task.cancel() |
| 111 | + ensure_future(await_task(), loop=loop) |
120 | 112 |
|
| 113 | + return dispose |
121 | 114 |
|
122 |
| -def asyncgen_to_observable(asyncgen): |
123 |
| - def emit(observer): |
124 |
| - ensure_future(iterate_asyncgen(asyncgen, observer)) |
125 |
| - return AsyncgenObservable(emit, asyncgen) |
| 115 | + return AnonymousObservable(emit) |
126 | 116 |
|
127 | 117 |
|
128 | 118 | async def iterate_asyncgen(asyncgen, observer):
|
129 | 119 | try:
|
130 | 120 | async for item in asyncgen:
|
131 | 121 | observer.on_next(item)
|
132 | 122 | observer.on_completed()
|
| 123 | + except CancelledError: |
| 124 | + pass |
133 | 125 | except Exception as e:
|
134 | 126 | observer.on_error(e)
|
0 commit comments