From 78819d391efb73432473482d2a232a61c263cdd8 Mon Sep 17 00:00:00 2001 From: Ingmar Steen Date: Tue, 9 Sep 2025 11:01:15 +0200 Subject: [PATCH 1/8] Wrap task locals loop and context in Arc. --- src/async_std.rs | 2 +- src/generic.rs | 12 ++++++------ src/lib.rs | 17 +++++++++-------- src/tokio.rs | 2 +- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/async_std.rs b/src/async_std.rs index 9262027..7aa664a 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -94,7 +94,7 @@ impl ContextExt for AsyncStdRuntime { .try_with(|c| { c.borrow() .as_ref() - .map(|locals| Python::attach(|py| locals.clone_ref(py))) + .map(|locals| locals.clone_ref()) }) .unwrap_or_default() } diff --git a/src/generic.rs b/src/generic.rs index e8853e1..4707cf6 100644 --- a/src/generic.rs +++ b/src/generic.rs @@ -91,7 +91,7 @@ where R: ContextExt, { if let Some(locals) = R::get_task_locals() { - Ok(locals.event_loop.into_bound(py)) + Ok(locals.event_loop.clone_ref(py).into_bound(py)) } else { get_running_loop(py) } @@ -597,11 +597,11 @@ where let future_tx2 = future_tx1.clone_ref(py); R::spawn(async move { - let locals2 = Python::attach(|py| locals.clone_ref(py)); + let locals2 = locals.clone_ref(); if let Err(e) = R::spawn(async move { let result = R::scope( - Python::attach(|py| locals2.clone_ref(py)), + locals2.clone_ref(), Cancellable::new_with_cancel_rx(fut, cancel_rx), ) .await; @@ -1002,11 +1002,11 @@ where let future_tx2 = future_tx1.clone_ref(py); R::spawn_local(async move { - let locals2 = Python::attach(|py| locals.clone_ref(py)); + let locals2 = locals.clone_ref(); if let Err(e) = R::spawn_local(async move { let result = R::scope_local( - Python::attach(|py| locals2.clone_ref(py)), + locals2.clone_ref(), Cancellable::new_with_cancel_rx(fut, cancel_rx), ) .await; @@ -1510,7 +1510,7 @@ impl SenderGlue { self.tx .lock() .unwrap() - .send(py, self.locals.clone_ref(py), item) + .send(py, self.locals.clone_ref(), item) }) } pub fn close(&mut self) -> PyResult<()> { diff --git a/src/lib.rs b/src/lib.rs index 15d2160..e6c2a6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -395,6 +395,7 @@ pub mod doc_test { } use std::future::Future; +use std::sync::Arc; use futures::channel::oneshot; use pyo3::{call::PyCallArgs, prelude::*, sync::PyOnceLock, types::PyDict}; @@ -472,17 +473,17 @@ fn copy_context(py: Python) -> PyResult> { #[derive(Debug)] pub struct TaskLocals { /// Track the event loop of the Python task - event_loop: Py, + event_loop: Arc>, /// Track the contextvars of the Python task - context: Py, + context: Arc>, } impl TaskLocals { /// At a minimum, TaskLocals must store the event loop. pub fn new(event_loop: Bound) -> Self { Self { - context: event_loop.py().None(), - event_loop: event_loop.into(), + context: Arc::new(event_loop.py().None()), + event_loop: Arc::new(event_loop.into()), } } @@ -494,7 +495,7 @@ impl TaskLocals { /// Manually provide the contextvars for the current task. pub fn with_context(self, context: Bound) -> Self { Self { - context: context.into(), + context: Arc::new(context.into()), ..self } } @@ -516,10 +517,10 @@ impl TaskLocals { /// Create a clone of the TaskLocals by incrementing the reference counters of the event loop and /// contextvars. - pub fn clone_ref(&self, py: Python<'_>) -> Self { + pub fn clone_ref(&self) -> Self { Self { - event_loop: self.event_loop.clone_ref(py), - context: self.context.clone_ref(py), + event_loop: self.event_loop.clone(), + context: self.context.clone(), } } } diff --git a/src/tokio.rs b/src/tokio.rs index 96fc7ee..c63c2a6 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -111,7 +111,7 @@ impl ContextExt for TokioRuntime { TASK_LOCALS .try_with(|c| { c.get() - .map(|locals| Python::attach(|py| locals.clone_ref(py))) + .map(|locals| locals.clone_ref()) }) .unwrap_or_default() } From 0ee77f23e4ff4ddec790731ef385c8724e55d916 Mon Sep 17 00:00:00 2001 From: Ingmar Steen Date: Tue, 9 Sep 2025 11:20:20 +0200 Subject: [PATCH 2/8] Fix tests. --- src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e6c2a6c..d6a4c70 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,7 +93,7 @@ //! let locals = pyo3_async_runtimes::TaskLocals::with_running_loop(py)?.copy_context(py)?; //! //! // Convert the async move { } block to a Python awaitable -//! pyo3_async_runtimes::tokio::future_into_py_with_locals(py, locals.clone_ref(py), async move { +//! pyo3_async_runtimes::tokio::future_into_py_with_locals(py, locals.clone_ref(), async move { //! let py_sleep = Python::attach(|py| { //! // Sometimes we need to call other async Python functions within //! // this future. In order for this to work, we need to track the @@ -162,9 +162,9 @@ //! //! pyo3_async_runtimes::tokio::future_into_py_with_locals( //! py, -//! locals.clone_ref(py), +//! locals.clone_ref(), //! // Store the current locals in task-local data -//! pyo3_async_runtimes::tokio::scope(locals.clone_ref(py), async move { +//! pyo3_async_runtimes::tokio::scope(locals.clone_ref(), async move { //! let py_sleep = Python::attach(|py| { //! pyo3_async_runtimes::into_future_with_locals( //! // Now we can get the current locals through task-local data @@ -189,9 +189,9 @@ //! //! pyo3_async_runtimes::tokio::future_into_py_with_locals( //! py, -//! locals.clone_ref(py), +//! locals.clone_ref(), //! // Store the current locals in task-local data -//! pyo3_async_runtimes::tokio::scope(locals.clone_ref(py), async move { +//! pyo3_async_runtimes::tokio::scope(locals.clone_ref(), async move { //! let py_sleep = Python::attach(|py| { //! pyo3_async_runtimes::into_future_with_locals( //! &pyo3_async_runtimes::tokio::get_current_locals(py)?, From cc58d9552fd5d869a67d7dfabfa249e4c0ce0b3a Mon Sep 17 00:00:00 2001 From: Ingmar Steen Date: Tue, 9 Sep 2025 15:25:47 +0200 Subject: [PATCH 3/8] Make TaskLocals properly cloneable. --- src/async_std.rs | 2 +- src/generic.rs | 10 +++++----- src/lib.rs | 14 ++++++++------ src/tokio.rs | 2 +- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/async_std.rs b/src/async_std.rs index 7aa664a..07a660a 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -94,7 +94,7 @@ impl ContextExt for AsyncStdRuntime { .try_with(|c| { c.borrow() .as_ref() - .map(|locals| locals.clone_ref()) + .map(|locals| locals.clone()) }) .unwrap_or_default() } diff --git a/src/generic.rs b/src/generic.rs index 4707cf6..17d4fd8 100644 --- a/src/generic.rs +++ b/src/generic.rs @@ -597,11 +597,11 @@ where let future_tx2 = future_tx1.clone_ref(py); R::spawn(async move { - let locals2 = locals.clone_ref(); + let locals2 = locals.clone(); if let Err(e) = R::spawn(async move { let result = R::scope( - locals2.clone_ref(), + locals2.clone(), Cancellable::new_with_cancel_rx(fut, cancel_rx), ) .await; @@ -1002,11 +1002,11 @@ where let future_tx2 = future_tx1.clone_ref(py); R::spawn_local(async move { - let locals2 = locals.clone_ref(); + let locals2 = locals.clone(); if let Err(e) = R::spawn_local(async move { let result = R::scope_local( - locals2.clone_ref(), + locals2.clone(), Cancellable::new_with_cancel_rx(fut, cancel_rx), ) .await; @@ -1510,7 +1510,7 @@ impl SenderGlue { self.tx .lock() .unwrap() - .send(py, self.locals.clone_ref(), item) + .send(py, self.locals.clone(), item) }) } pub fn close(&mut self) -> PyResult<()> { diff --git a/src/lib.rs b/src/lib.rs index d6a4c70..ea44520 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,7 +93,7 @@ //! let locals = pyo3_async_runtimes::TaskLocals::with_running_loop(py)?.copy_context(py)?; //! //! // Convert the async move { } block to a Python awaitable -//! pyo3_async_runtimes::tokio::future_into_py_with_locals(py, locals.clone_ref(), async move { +//! pyo3_async_runtimes::tokio::future_into_py_with_locals(py, locals.clone(), async move { //! let py_sleep = Python::attach(|py| { //! // Sometimes we need to call other async Python functions within //! // this future. In order for this to work, we need to track the @@ -162,9 +162,9 @@ //! //! pyo3_async_runtimes::tokio::future_into_py_with_locals( //! py, -//! locals.clone_ref(), +//! locals.clone(), //! // Store the current locals in task-local data -//! pyo3_async_runtimes::tokio::scope(locals.clone_ref(), async move { +//! pyo3_async_runtimes::tokio::scope(locals.clone(), async move { //! let py_sleep = Python::attach(|py| { //! pyo3_async_runtimes::into_future_with_locals( //! // Now we can get the current locals through task-local data @@ -189,9 +189,9 @@ //! //! pyo3_async_runtimes::tokio::future_into_py_with_locals( //! py, -//! locals.clone_ref(), +//! locals.clone(), //! // Store the current locals in task-local data -//! pyo3_async_runtimes::tokio::scope(locals.clone_ref(), async move { +//! pyo3_async_runtimes::tokio::scope(locals.clone(), async move { //! let py_sleep = Python::attach(|py| { //! pyo3_async_runtimes::into_future_with_locals( //! &pyo3_async_runtimes::tokio::get_current_locals(py)?, @@ -514,10 +514,12 @@ impl TaskLocals { pub fn context<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> { self.context.clone_ref(py).into_bound(py) } +} +impl Clone for TaskLocals { /// Create a clone of the TaskLocals by incrementing the reference counters of the event loop and /// contextvars. - pub fn clone_ref(&self) -> Self { + fn clone(&self) -> Self { Self { event_loop: self.event_loop.clone(), context: self.context.clone(), diff --git a/src/tokio.rs b/src/tokio.rs index c63c2a6..20ea10d 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -111,7 +111,7 @@ impl ContextExt for TokioRuntime { TASK_LOCALS .try_with(|c| { c.get() - .map(|locals| locals.clone_ref()) + .map(|locals| locals.clone()) }) .unwrap_or_default() } From adcf654024d212b55507fcac5ee81da7a680e2c0 Mon Sep 17 00:00:00 2001 From: Ingmar Steen Date: Wed, 10 Sep 2025 14:14:27 +0200 Subject: [PATCH 4/8] Update CHANGELOG (and backfill 0.26.0) --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fb2086e..e38c81a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ To see unreleased changes, please see the CHANGELOG on the main branch. +- Avoid attaching to the runtime when cloning TaskLocals by using std::sync::Arc. [#62](https://github.com/PyO3/pyo3-async-runtimes/pull/62) + +## [0.26.0] - 2025-09-02 + +- Bump to pyo3 0.26. [#54](https://github.com/PyO3/pyo3-async-runtimes/pull/54) + ## [0.25.0] - 2025-05-14 - Bump to pyo3 0.25. [#41](https://github.com/PyO3/pyo3-async-runtimes/pull/41) From c9a60b26a1f0dc4816b63d09274ada8a67c275b5 Mon Sep 17 00:00:00 2001 From: Ingmar Steen Date: Wed, 10 Sep 2025 16:44:40 +0200 Subject: [PATCH 5/8] Re-introduce clone_ref but deprecate it. --- src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index ea44520..f2478be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -514,6 +514,12 @@ impl TaskLocals { pub fn context<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> { self.context.clone_ref(py).into_bound(py) } + + /// Create a clone of the TaskLocals. No longer uses the runtime, use `clone` instead. + #[deprecated(note = "please use `clone` instead")] + pub fn clone_ref(&self, _py: Python<'_>) -> Self { + self.clone() + } } impl Clone for TaskLocals { From 86d209ff66ba1bb6ab9d077d6fd4f9ef54eca79a Mon Sep 17 00:00:00 2001 From: Ingmar Steen Date: Wed, 10 Sep 2025 16:45:01 +0200 Subject: [PATCH 6/8] fmt --- src/async_std.rs | 6 +----- src/generic.rs | 7 +------ src/tokio.rs | 5 +---- 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/src/async_std.rs b/src/async_std.rs index 07a660a..3014ef6 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -91,11 +91,7 @@ impl ContextExt for AsyncStdRuntime { fn get_task_locals() -> Option { TASK_LOCALS - .try_with(|c| { - c.borrow() - .as_ref() - .map(|locals| locals.clone()) - }) + .try_with(|c| c.borrow().as_ref().map(|locals| locals.clone())) .unwrap_or_default() } } diff --git a/src/generic.rs b/src/generic.rs index 17d4fd8..22b01f7 100644 --- a/src/generic.rs +++ b/src/generic.rs @@ -1506,12 +1506,7 @@ struct SenderGlue { #[pymethods] impl SenderGlue { pub fn send(&mut self, item: Py) -> PyResult> { - Python::attach(|py| { - self.tx - .lock() - .unwrap() - .send(py, self.locals.clone(), item) - }) + Python::attach(|py| self.tx.lock().unwrap().send(py, self.locals.clone(), item)) } pub fn close(&mut self) -> PyResult<()> { self.tx.lock().unwrap().close() diff --git a/src/tokio.rs b/src/tokio.rs index 20ea10d..4865b93 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -109,10 +109,7 @@ impl ContextExt for TokioRuntime { fn get_task_locals() -> Option { TASK_LOCALS - .try_with(|c| { - c.get() - .map(|locals| locals.clone()) - }) + .try_with(|c| c.get().map(|locals| locals.clone())) .unwrap_or_default() } } From 906aebaf89f1c36aee540df2b1f13c67eb3e86d6 Mon Sep 17 00:00:00 2001 From: Ingmar Steen Date: Fri, 12 Sep 2025 10:03:39 +0200 Subject: [PATCH 7/8] Wrap TaskLocals in a single Arc. --- src/generic.rs | 12 ++++++------ src/lib.rs | 37 +++++++++++++++++++------------------ 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/src/generic.rs b/src/generic.rs index 22b01f7..fc60281 100644 --- a/src/generic.rs +++ b/src/generic.rs @@ -91,7 +91,7 @@ where R: ContextExt, { if let Some(locals) = R::get_task_locals() { - Ok(locals.event_loop.clone_ref(py).into_bound(py)) + Ok(locals.0.event_loop.clone_ref(py).into_bound(py)) } else { get_running_loop(py) } @@ -585,7 +585,7 @@ where { let (cancel_tx, cancel_rx) = oneshot::channel(); - let py_fut = create_future(locals.event_loop.bind(py).clone())?; + let py_fut = create_future(locals.0.event_loop.bind(py).clone())?; py_fut.call_method1( "add_done_callback", (PyDoneCallback { @@ -638,7 +638,7 @@ where get_panic_message(&e.into_panic()) ); let _ = set_result( - locals.event_loop.bind(py), + locals.0.event_loop.bind(py), future_tx2.bind(py), Err(RustPanic::new_err(panic_message)), ) @@ -990,7 +990,7 @@ where { let (cancel_tx, cancel_rx) = oneshot::channel(); - let py_fut = create_future(locals.event_loop.clone_ref(py).into_bound(py))?; + let py_fut = create_future(locals.0.event_loop.clone_ref(py).into_bound(py))?; py_fut.call_method1( "add_done_callback", (PyDoneCallback { @@ -1020,7 +1020,7 @@ where } let _ = set_result( - locals2.event_loop.bind(py), + locals2.0.event_loop.bind(py), future_tx1.bind(py), result.and_then(|val| val.into_py_any(py)), ) @@ -1043,7 +1043,7 @@ where get_panic_message(&e.into_panic()) ); let _ = set_result( - locals.event_loop.bind(py), + locals.0.event_loop.bind(py), future_tx2.bind(py), Err(RustPanic::new_err(panic_message)), ) diff --git a/src/lib.rs b/src/lib.rs index f2478be..327af8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -469,22 +469,26 @@ fn copy_context(py: Python) -> PyResult> { contextvars(py)?.call_method0("copy_context") } -/// Task-local data to store for Python conversions. +/// Task-local inner structure. #[derive(Debug)] -pub struct TaskLocals { +struct TaskLocalsInner { /// Track the event loop of the Python task - event_loop: Arc>, + event_loop: Py, /// Track the contextvars of the Python task - context: Arc>, + context: Py, } +/// Task-local data to store for Python conversions. +#[derive(Debug)] +pub struct TaskLocals(Arc); + impl TaskLocals { /// At a minimum, TaskLocals must store the event loop. pub fn new(event_loop: Bound) -> Self { - Self { - context: Arc::new(event_loop.py().None()), - event_loop: Arc::new(event_loop.into()), - } + Self(Arc::new(TaskLocalsInner { + context: event_loop.py().None(), + event_loop: event_loop.into(), + })) } /// Construct TaskLocals with the event loop returned by `get_running_loop` @@ -494,10 +498,10 @@ impl TaskLocals { /// Manually provide the contextvars for the current task. pub fn with_context(self, context: Bound) -> Self { - Self { - context: Arc::new(context.into()), - ..self - } + Self(Arc::new(TaskLocalsInner { + event_loop: self.0.event_loop.clone_ref(context.py()), + context: context.into(), + })) } /// Capture the current task's contextvars @@ -507,12 +511,12 @@ impl TaskLocals { /// Get a reference to the event loop pub fn event_loop<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> { - self.event_loop.clone_ref(py).into_bound(py) + self.0.event_loop.clone_ref(py).into_bound(py) } /// Get a reference to the python context pub fn context<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> { - self.context.clone_ref(py).into_bound(py) + self.0.context.clone_ref(py).into_bound(py) } /// Create a clone of the TaskLocals. No longer uses the runtime, use `clone` instead. @@ -526,10 +530,7 @@ impl Clone for TaskLocals { /// Create a clone of the TaskLocals by incrementing the reference counters of the event loop and /// contextvars. fn clone(&self) -> Self { - Self { - event_loop: self.event_loop.clone(), - context: self.context.clone(), - } + Self(self.0.clone()) } } From b921141c10129e696724369310d03b6ca5f83611 Mon Sep 17 00:00:00 2001 From: Ingmar Steen Date: Fri, 12 Sep 2025 12:56:57 +0200 Subject: [PATCH 8/8] Fix comment on clone(). --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 327af8f..9d35ebe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -527,8 +527,8 @@ impl TaskLocals { } impl Clone for TaskLocals { - /// Create a clone of the TaskLocals by incrementing the reference counters of the event loop and - /// contextvars. + /// Create a clone of the TaskLocals by incrementing the reference counter of the inner + /// structure. fn clone(&self) -> Self { Self(self.0.clone()) }