From a75d6fa3add0ef4daac713488ee6b5e8f103d6a5 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 3 Dec 2024 18:46:50 +0000 Subject: [PATCH 1/4] expand the run_coroutine_threadsafe recipies --- Doc/library/asyncio-task.rst | 71 ++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 8 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index f27e858cf420f4..bd5cebf92f7adc 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -1067,14 +1067,69 @@ Scheduling From Other Threads This function is meant to be called from a different OS thread than the one where the event loop is running. Example:: - # Create a coroutine - coro = asyncio.sleep(1, result=3) - - # Submit the coroutine to a given loop - future = asyncio.run_coroutine_threadsafe(coro, loop) - - # Wait for the result with an optional timeout argument - assert future.result(timeout) == 3 + # Create a contextvar in your module + ctx_loop: ContextVar[asyncio.AbstractEventLoop] = contextvars.ContextVar('ctx_loop') + + + def in_thread(): + # Get the loop from the context + loop = ctx_loop.get() + + # Run some blocking IO + pathlib.Path("example").read_text(encoding="utf8") + + # Create a coroutine + coro = asyncio.sleep(1, result=3) + + # Submit the coroutine to a given loop + future = asyncio.run_coroutine_threadsafe(coro, loop) + + # Wait for the result with an optional timeout argument + assert future.result(timeout) == 3 + + async def amain(): + # Set the loop in the ContextVar + token = ctx_loop.set(asyncio.get_running_loop()) + try: + # Run something in a thread + await asyncio.to_thread(in_thread) + finally: + # Reset the ContextVar + ctx_loop.reset(token) + + It's also possible to run the other way around. Example:: + + @contextlib.contextmanager + def loop_in_thread() -> Generator[asyncio.AbstractEventLoop] + loop_fut = concurrent.futures.Future[asyncio.AbstractEventLoop]() + stop_event = asyncio.Event() + + async def main() -> None: + loop_fut.set_result(asyncio.get_running_loop()) + await stop_event.wait() + + with concurrent.futures.ThreadPoolExecutor(1) as tpe: + complete_fut = tpe.submit(asyncio.run, main()) + for fut in concurrent.futures.as_completed((loop_fut, complete_fut)): + if fut is loop_fut: + loop = loop_fut.result() + try: + yield loop + finally: + loop.call_soon_threadsafe(stop_event.set) + else: + fut.result() + + # Create a loop in another thread + with loop_in_thread() as loop: + # Create a coroutine + coro = asyncio.sleep(1, result=3) + + # Submit the coroutine to a given loop + future = asyncio.run_coroutine_threadsafe(coro, loop) + + # Wait for the result with an optional timeout argument + assert future.result(timeout) == 3 If an exception is raised in the coroutine, the returned Future will be notified. It can also be used to cancel the task in From 93b58f92b011b9c85655ddb0c1f5c0e21608f156 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 3 Dec 2024 18:54:39 +0000 Subject: [PATCH 2/4] Apply suggestions from code review --- Doc/library/asyncio-task.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index bd5cebf92f7adc..f723767a79fa36 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -1068,10 +1068,10 @@ Scheduling From Other Threads than the one where the event loop is running. Example:: # Create a contextvar in your module - ctx_loop: ContextVar[asyncio.AbstractEventLoop] = contextvars.ContextVar('ctx_loop') + ctx_loop: contextvars.ContextVar[asyncio.AbstractEventLoop]('ctx_loop') - def in_thread(): + def in_thread() -> None: # Get the loop from the context loop = ctx_loop.get() @@ -1085,9 +1085,9 @@ Scheduling From Other Threads future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument - assert future.result(timeout) == 3 + assert future.result(timeout=2) == 3 - async def amain(): + async def amain() -> None: # Set the loop in the ContextVar token = ctx_loop.set(asyncio.get_running_loop()) try: @@ -1100,7 +1100,7 @@ Scheduling From Other Threads It's also possible to run the other way around. Example:: @contextlib.contextmanager - def loop_in_thread() -> Generator[asyncio.AbstractEventLoop] + def loop_in_thread() -> Generator[asyncio.AbstractEventLoop]: loop_fut = concurrent.futures.Future[asyncio.AbstractEventLoop]() stop_event = asyncio.Event() @@ -1129,7 +1129,7 @@ Scheduling From Other Threads future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument - assert future.result(timeout) == 3 + assert future.result(timeout=2) == 3 If an exception is raised in the coroutine, the returned Future will be notified. It can also be used to cancel the task in From 8e9012d9d55a350d703359e70e7d33c99e3d4f9b Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 5 Dec 2024 15:23:34 +0000 Subject: [PATCH 3/4] simplify example --- Doc/library/asyncio-task.rst | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index f723767a79fa36..7b09171abe1c3e 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -1067,14 +1067,7 @@ Scheduling From Other Threads This function is meant to be called from a different OS thread than the one where the event loop is running. Example:: - # Create a contextvar in your module - ctx_loop: contextvars.ContextVar[asyncio.AbstractEventLoop]('ctx_loop') - - - def in_thread() -> None: - # Get the loop from the context - loop = ctx_loop.get() - + def in_thread(loop: asyncio.AbstractEventLoop) -> None: # Run some blocking IO pathlib.Path("example").read_text(encoding="utf8") @@ -1088,14 +1081,11 @@ Scheduling From Other Threads assert future.result(timeout=2) == 3 async def amain() -> None: - # Set the loop in the ContextVar - token = ctx_loop.set(asyncio.get_running_loop()) - try: - # Run something in a thread - await asyncio.to_thread(in_thread) - finally: - # Reset the ContextVar - ctx_loop.reset(token) + # Get the running loop + loop = asyncio.get_running_loop() + + # Run something in a thread + await asyncio.to_thread(in_thread, loop) It's also possible to run the other way around. Example:: From 0812a91b8d6b6a39c2da6ad9e630a45d5756200b Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 29 Dec 2024 05:41:01 +0000 Subject: [PATCH 4/4] Update Doc/library/asyncio-task.rst Co-authored-by: Kumar Aditya --- Doc/library/asyncio-task.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 7b09171abe1c3e..4541cf28de0605 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -1069,7 +1069,7 @@ Scheduling From Other Threads def in_thread(loop: asyncio.AbstractEventLoop) -> None: # Run some blocking IO - pathlib.Path("example").read_text(encoding="utf8") + pathlib.Path("example.txt").write_text("hello world", encoding="utf8") # Create a coroutine coro = asyncio.sleep(1, result=3)