-
-
Notifications
You must be signed in to change notification settings - Fork 33.2k
expand the run_coroutine_threadsafe recipies #127576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
a75d6fa
9e6259b
93b58f9
8e9012d
1ffc0c6
0812a91
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1067,14 +1067,59 @@ Scheduling From Other Threads | |
This function is meant to be called from a different OS thread | ||
than the one where the event loop is running. Example:: | ||
|
||
# Create a coroutine | ||
coro = asyncio.sleep(1, result=3) | ||
|
||
# Submit the coroutine to a given loop | ||
future = asyncio.run_coroutine_threadsafe(coro, loop) | ||
|
||
# Wait for the result with an optional timeout argument | ||
assert future.result(timeout) == 3 | ||
def in_thread(loop: asyncio.AbstractEventLoop) -> None: | ||
# Run some blocking IO | ||
pathlib.Path("example").read_text(encoding="utf8") | ||
|
||
# Create a coroutine | ||
coro = asyncio.sleep(1, result=3) | ||
|
||
# Submit the coroutine to a given loop | ||
future = asyncio.run_coroutine_threadsafe(coro, loop) | ||
|
||
# Wait for the result with an optional timeout argument | ||
assert future.result(timeout=2) == 3 | ||
|
||
async def amain() -> None: | ||
# Get the running loop | ||
loop = asyncio.get_running_loop() | ||
|
||
# Run something in a thread | ||
await asyncio.to_thread(in_thread, loop) | ||
|
||
It's also possible to run the other way around. Example:: | ||
|
||
@contextlib.contextmanager | ||
def loop_in_thread() -> Generator[asyncio.AbstractEventLoop]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW I think the above simpler example is sufficient, this one looks very contrived to me involving thread pool executor and all. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's used to get any failures out, if they happen when constructing the loop There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant that can we avoid mixing concurrent futures executors with loop? Starting loop in a thread should be sufficient here no? I don't want to encourage such code which creates loop in another thread but runs code from another thread especially using concurrent futures. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to be able to start and stop the thread, and collect errors that get raised from asyncio.run, it's complicated to do that with threading.Thread There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am thinking of something like this: from threading import Thread
import contextlib
import asyncio
import concurrent.futures
@contextlib.contextmanager
def get_loop():
loop_fut = concurrent.futures.Future()
stop = asyncio.Event()
async def runner():
loop_fut.set_result(asyncio.get_event_loop())
await stop.wait()
try:
t = Thread(target=lambda: asyncio.run(runner()))
t.start()
loop = loop_fut.result()
yield loop
finally:
loop.call_soon_threadsafe(stop.set)
t.join()
with get_loop() as loop:
r = asyncio.run_coroutine_threadsafe(
asyncio.sleep(1, result="hello"), loop
).result()
print(r) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But ThreadPoolExecutor(1) does all that for you, and collects any result from asyncio.run. Eg your code will hang if the OS has run out of file descriptors, mine will propagate the exception There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I take it, I am not really a fan of this pattern but I am fine if you wish. |
||
loop_fut = concurrent.futures.Future[asyncio.AbstractEventLoop]() | ||
stop_event = asyncio.Event() | ||
|
||
async def main() -> None: | ||
loop_fut.set_result(asyncio.get_running_loop()) | ||
await stop_event.wait() | ||
|
||
with concurrent.futures.ThreadPoolExecutor(1) as tpe: | ||
complete_fut = tpe.submit(asyncio.run, main()) | ||
for fut in concurrent.futures.as_completed((loop_fut, complete_fut)): | ||
kumaraditya303 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if fut is loop_fut: | ||
loop = loop_fut.result() | ||
try: | ||
yield loop | ||
finally: | ||
loop.call_soon_threadsafe(stop_event.set) | ||
else: | ||
fut.result() | ||
|
||
# Create a loop in another thread | ||
with loop_in_thread() as loop: | ||
# Create a coroutine | ||
coro = asyncio.sleep(1, result=3) | ||
|
||
# Submit the coroutine to a given loop | ||
future = asyncio.run_coroutine_threadsafe(coro, loop) | ||
|
||
# Wait for the result with an optional timeout argument | ||
assert future.result(timeout=2) == 3 | ||
|
||
If an exception is raised in the coroutine, the returned Future | ||
will be notified. It can also be used to cancel the task in | ||
|
Uh oh!
There was an error while loading. Please reload this page.