1919from collections .abc import Awaitable , Generator
2020from concurrent .futures import Future
2121from contextlib import contextmanager
22- from typing import Any , TypeAlias , TypeVar
22+ from typing import Any , TypeVar
2323
2424_T = TypeVar ("_T" )
2525
26- _FutureLike : TypeAlias = asyncio .Future [_T ] | Awaitable [_T ]
26+ type _FutureLike [ _T ] = asyncio .Future [_T ] | Awaitable [_T ]
2727
2828__all__ = ["threaded_loop" ]
2929
@@ -32,6 +32,9 @@ class LoopWrapper:
3232 def __init__ (self , loop : asyncio .AbstractEventLoop ):
3333 self ._loop = loop
3434
35+ def stop (self ) -> None :
36+ self ._loop .call_soon_threadsafe (self ._loop .stop )
37+
3538 def schedule (self , coro : _FutureLike [_T ]) -> Future [_T ]:
3639 """Schedule a coroutine to run on the wrapped event loop"""
3740 return asyncio .run_coroutine_threadsafe (coro , self ._loop )
@@ -52,12 +55,14 @@ def future_done_callback(_f: Future[Any]) -> object:
5255 return future .result ()
5356
5457
55- def run_forever (loop : asyncio .AbstractEventLoop ) -> None :
58+ def run_forever (loop : asyncio .AbstractEventLoop , use_eager_task_factory : bool , / ) -> None :
5659 asyncio .set_event_loop (loop )
60+ if use_eager_task_factory :
61+ loop .set_task_factory (asyncio .eager_task_factory )
5762 try :
5863 loop .run_forever ()
5964 finally :
60- loop .run_until_complete (asyncio .sleep (0.05 ))
65+ loop .run_until_complete (asyncio .sleep (0 ))
6166 tasks : set [asyncio .Task [Any ]] = {t for t in asyncio .all_tasks (loop ) if not t .done ()}
6267 for t in tasks :
6368 t .cancel ()
@@ -83,7 +88,7 @@ def run_forever(loop: asyncio.AbstractEventLoop) -> None:
8388
8489
8590@contextmanager
86- def threaded_loop () -> Generator [LoopWrapper , None , None ]:
91+ def threaded_loop (* , use_eager_task_factory : bool = True ) -> Generator [LoopWrapper , None , None ]:
8792 """Starts an event loop on a background thread,
8893 and yields an object with scheduling methods for interacting with
8994 the loop.
@@ -92,7 +97,7 @@ def threaded_loop() -> Generator[LoopWrapper, None, None]:
9297 loop = asyncio .new_event_loop ()
9398 thread = None
9499 try :
95- thread = threading .Thread (target = run_forever , args = (loop ,))
100+ thread = threading .Thread (target = run_forever , args = (loop , use_eager_task_factory ))
96101 thread .start ()
97102 yield LoopWrapper (loop )
98103 finally :
0 commit comments