Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ To see unreleased changes, please see the CHANGELOG on the main branch.

<!-- towncrier release notes start -->

- Avoid attaching to the runtime when cloning TaskLocals by using std::sync::Arc. [#62](https://github.com/PyO3/pyo3-async-runtimes/pull/62)

## [0.26.0] - 2025-09-02

- Bump to pyo3 0.26. [#54](https://github.com/PyO3/pyo3-async-runtimes/pull/54)

## [0.25.0] - 2025-05-14

- Bump to pyo3 0.25. [#41](https://github.com/PyO3/pyo3-async-runtimes/pull/41)
Expand Down
6 changes: 1 addition & 5 deletions src/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ impl ContextExt for AsyncStdRuntime {

fn get_task_locals() -> Option<TaskLocals> {
TASK_LOCALS
.try_with(|c| {
c.borrow()
.as_ref()
.map(|locals| Python::attach(|py| locals.clone_ref(py)))
})
.try_with(|c| c.borrow().as_ref().map(|locals| locals.clone()))
.unwrap_or_default()
}
}
Expand Down
27 changes: 11 additions & 16 deletions src/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
R: ContextExt,
{
if let Some(locals) = R::get_task_locals() {
Ok(locals.event_loop.into_bound(py))
Ok(locals.0.event_loop.clone_ref(py).into_bound(py))
} else {
get_running_loop(py)
}
Expand Down Expand Up @@ -585,7 +585,7 @@ where
{
let (cancel_tx, cancel_rx) = oneshot::channel();

let py_fut = create_future(locals.event_loop.bind(py).clone())?;
let py_fut = create_future(locals.0.event_loop.bind(py).clone())?;
py_fut.call_method1(
"add_done_callback",
(PyDoneCallback {
Expand All @@ -597,11 +597,11 @@ where
let future_tx2 = future_tx1.clone_ref(py);

R::spawn(async move {
let locals2 = Python::attach(|py| locals.clone_ref(py));
let locals2 = locals.clone();

if let Err(e) = R::spawn(async move {
let result = R::scope(
Python::attach(|py| locals2.clone_ref(py)),
locals2.clone(),
Cancellable::new_with_cancel_rx(fut, cancel_rx),
)
.await;
Expand Down Expand Up @@ -638,7 +638,7 @@ where
get_panic_message(&e.into_panic())
);
let _ = set_result(
locals.event_loop.bind(py),
locals.0.event_loop.bind(py),
future_tx2.bind(py),
Err(RustPanic::new_err(panic_message)),
)
Expand Down Expand Up @@ -990,7 +990,7 @@ where
{
let (cancel_tx, cancel_rx) = oneshot::channel();

let py_fut = create_future(locals.event_loop.clone_ref(py).into_bound(py))?;
let py_fut = create_future(locals.0.event_loop.clone_ref(py).into_bound(py))?;
py_fut.call_method1(
"add_done_callback",
(PyDoneCallback {
Expand All @@ -1002,11 +1002,11 @@ where
let future_tx2 = future_tx1.clone_ref(py);

R::spawn_local(async move {
let locals2 = Python::attach(|py| locals.clone_ref(py));
let locals2 = locals.clone();

if let Err(e) = R::spawn_local(async move {
let result = R::scope_local(
Python::attach(|py| locals2.clone_ref(py)),
locals2.clone(),
Cancellable::new_with_cancel_rx(fut, cancel_rx),
)
.await;
Expand All @@ -1020,7 +1020,7 @@ where
}

let _ = set_result(
locals2.event_loop.bind(py),
locals2.0.event_loop.bind(py),
future_tx1.bind(py),
result.and_then(|val| val.into_py_any(py)),
)
Expand All @@ -1043,7 +1043,7 @@ where
get_panic_message(&e.into_panic())
);
let _ = set_result(
locals.event_loop.bind(py),
locals.0.event_loop.bind(py),
future_tx2.bind(py),
Err(RustPanic::new_err(panic_message)),
)
Expand Down Expand Up @@ -1506,12 +1506,7 @@ struct SenderGlue {
#[pymethods]
impl SenderGlue {
pub fn send(&mut self, item: Py<PyAny>) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
self.tx
.lock()
.unwrap()
.send(py, self.locals.clone_ref(py), item)
})
Python::attach(|py| self.tx.lock().unwrap().send(py, self.locals.clone(), item))
}
pub fn close(&mut self) -> PyResult<()> {
self.tx.lock().unwrap().close()
Expand Down
52 changes: 31 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
//! let locals = pyo3_async_runtimes::TaskLocals::with_running_loop(py)?.copy_context(py)?;
//!
//! // Convert the async move { } block to a Python awaitable
//! pyo3_async_runtimes::tokio::future_into_py_with_locals(py, locals.clone_ref(py), async move {
//! pyo3_async_runtimes::tokio::future_into_py_with_locals(py, locals.clone(), async move {
//! let py_sleep = Python::attach(|py| {
//! // Sometimes we need to call other async Python functions within
//! // this future. In order for this to work, we need to track the
Expand Down Expand Up @@ -162,9 +162,9 @@
//!
//! pyo3_async_runtimes::tokio::future_into_py_with_locals(
//! py,
//! locals.clone_ref(py),
//! locals.clone(),
//! // Store the current locals in task-local data
//! pyo3_async_runtimes::tokio::scope(locals.clone_ref(py), async move {
//! pyo3_async_runtimes::tokio::scope(locals.clone(), async move {
//! let py_sleep = Python::attach(|py| {
//! pyo3_async_runtimes::into_future_with_locals(
//! // Now we can get the current locals through task-local data
Expand All @@ -189,9 +189,9 @@
//!
//! pyo3_async_runtimes::tokio::future_into_py_with_locals(
//! py,
//! locals.clone_ref(py),
//! locals.clone(),
//! // Store the current locals in task-local data
//! pyo3_async_runtimes::tokio::scope(locals.clone_ref(py), async move {
//! pyo3_async_runtimes::tokio::scope(locals.clone(), async move {
//! let py_sleep = Python::attach(|py| {
//! pyo3_async_runtimes::into_future_with_locals(
//! &pyo3_async_runtimes::tokio::get_current_locals(py)?,
Expand Down Expand Up @@ -395,6 +395,7 @@ pub mod doc_test {
}

use std::future::Future;
use std::sync::Arc;

use futures::channel::oneshot;
use pyo3::{call::PyCallArgs, prelude::*, sync::PyOnceLock, types::PyDict};
Expand Down Expand Up @@ -468,22 +469,26 @@ fn copy_context(py: Python) -> PyResult<Bound<PyAny>> {
contextvars(py)?.call_method0("copy_context")
}

/// Task-local data to store for Python conversions.
/// Task-local inner structure.
#[derive(Debug)]
pub struct TaskLocals {
struct TaskLocalsInner {
/// Track the event loop of the Python task
event_loop: Py<PyAny>,
/// Track the contextvars of the Python task
context: Py<PyAny>,
}

/// Task-local data to store for Python conversions.
#[derive(Debug)]
pub struct TaskLocals(Arc<TaskLocalsInner>);

impl TaskLocals {
/// At a minimum, TaskLocals must store the event loop.
pub fn new(event_loop: Bound<PyAny>) -> Self {
Self {
Self(Arc::new(TaskLocalsInner {
context: event_loop.py().None(),
event_loop: event_loop.into(),
}
}))
}

/// Construct TaskLocals with the event loop returned by `get_running_loop`
Expand All @@ -493,10 +498,10 @@ impl TaskLocals {

/// Manually provide the contextvars for the current task.
pub fn with_context(self, context: Bound<PyAny>) -> Self {
Self {
Self(Arc::new(TaskLocalsInner {
event_loop: self.0.event_loop.clone_ref(context.py()),
context: context.into(),
..self
}
}))
}

/// Capture the current task's contextvars
Expand All @@ -506,21 +511,26 @@ impl TaskLocals {

/// Get a reference to the event loop
pub fn event_loop<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> {
self.event_loop.clone_ref(py).into_bound(py)
self.0.event_loop.clone_ref(py).into_bound(py)
}

/// Get a reference to the python context
pub fn context<'p>(&self, py: Python<'p>) -> Bound<'p, PyAny> {
self.context.clone_ref(py).into_bound(py)
self.0.context.clone_ref(py).into_bound(py)
}

/// Create a clone of the TaskLocals by incrementing the reference counters of the event loop and
/// contextvars.
pub fn clone_ref(&self, py: Python<'_>) -> Self {
Self {
event_loop: self.event_loop.clone_ref(py),
context: self.context.clone_ref(py),
}
/// Create a clone of the TaskLocals. No longer uses the runtime, use `clone` instead.
#[deprecated(note = "please use `clone` instead")]
pub fn clone_ref(&self, _py: Python<'_>) -> Self {
self.clone()
}
}

impl Clone for TaskLocals {
/// Create a clone of the TaskLocals by incrementing the reference counter of the inner
/// structure.
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ impl ContextExt for TokioRuntime {

fn get_task_locals() -> Option<TaskLocals> {
TASK_LOCALS
.try_with(|c| {
c.get()
.map(|locals| Python::attach(|py| locals.clone_ref(py)))
})
.try_with(|c| c.get().map(|locals| locals.clone()))
.unwrap_or_default()
}
}
Expand Down
Loading