Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/4742.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use critical section in `PyByteArray::to_vec` on freethreaded build to replicate GIL-enabled "soundness".
145 changes: 143 additions & 2 deletions src/types/bytearray.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::err::{PyErr, PyResult};
use crate::ffi_ptr_ext::FfiPtrExt;
use crate::instance::{Borrowed, Bound};
use crate::py_result_ext::PyResultExt;
use crate::sync::with_critical_section;
use crate::types::any::PyAnyMethods;
use crate::{ffi, PyAny, Python};
use std::slice;
Expand Down Expand Up @@ -124,7 +125,7 @@ pub trait PyByteArrayMethods<'py>: crate::sealed::Sealed {
/// using the slice.
///
/// As a result, this slice should only be used for short-lived operations without executing any
/// Python code, such as copying into a Vec.
/// Python code, such as copying into a Vec. For free-threaded Python support see also [`with_critical_section`].
///
/// # Examples
///
Expand Down Expand Up @@ -267,7 +268,7 @@ impl<'py> PyByteArrayMethods<'py> for Bound<'py, PyByteArray> {
}

fn to_vec(&self) -> Vec<u8> {
unsafe { self.as_bytes() }.to_vec()
with_critical_section(self, || unsafe { self.as_bytes() }.to_vec())
}

fn resize(&self, len: usize) -> PyResult<()> {
Expand Down Expand Up @@ -444,4 +445,144 @@ mod tests {
.is_instance_of::<PyValueError>(py));
})
}

// CPython 3.13t is unsound => test fails
#[cfg(any(Py_3_14, not(all(Py_3_13, Py_GIL_DISABLED))))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also needs to be skipped on emscripten

this failure mode happens often enough and wastes human time - it's probably worth adding a linter checking that all tests using threads are disabled on platforms that don't support spawning threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably worth adding a linter

Do you mean upstream in clippy or something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure - can you write custom clippy lints like that? If you can that's probably the easiest way. You might also be able to do it with a sufficiently advanced regex...

Also just to be clear my comment above wasn't an ask for you to do that, if it came across that way 😄

Copy link
Member

@davidhewitt davidhewitt Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can write custom lints, so regex might be the most feasible option albeit painful. I might just pretend the lint is harder to write than just losing the time repeatedly for now 🙈

#[test]
fn test_data_integrity_in_critical_section() {
use crate::instance::Py;
use crate::sync::{with_critical_section, MutexExt};

use pyo3_ffi::c_str;

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::thread;
use std::thread::ScopedJoinHandle;
use std::time::Duration;

const SIZE: usize = 200_000_000;
const DATA_VALUE: u8 = 42;
const GARBAGE_VALUE: u8 = 13;

fn make_byte_array(py: Python<'_>, size: usize, value: u8) -> Bound<'_, PyByteArray> {
PyByteArray::new_with(py, size, |b| {
b.fill(value);
Ok(())
})
.unwrap()
}

let data: Mutex<Py<PyByteArray>> = Mutex::new(Python::attach(|py| {
make_byte_array(py, SIZE, DATA_VALUE).unbind()
}));

fn get_data<'py>(
data: &Mutex<Py<PyByteArray>>,
py: Python<'py>,
) -> Bound<'py, PyByteArray> {
data.lock_py_attached(py).unwrap().bind(py).clone()
}

fn set_data(data: &Mutex<Py<PyByteArray>>, new: Bound<'_, PyByteArray>) {
let py = new.py();
*data.lock_py_attached(py).unwrap() = new.unbind()
}

let running = AtomicBool::new(true);
let extending = AtomicBool::new(false);

// continuously extends and resets the bytearray in data
let worker1 = || {
while running.load(Ordering::Relaxed) {
Python::attach(|py| {
let byte_array = get_data(&data, py);
extending.store(true, Ordering::SeqCst);
byte_array
.call_method("extend", (&byte_array,), None)
.unwrap();
extending.store(false, Ordering::SeqCst);
set_data(&data, make_byte_array(py, SIZE, DATA_VALUE));
});
}
};

// continuously checks the integrity of bytearray in data
let worker2 = || {
while running.load(Ordering::Relaxed) {
if !extending.load(Ordering::SeqCst) {
// wait until we have a chance to read inconsistent state
continue;
}
Python::attach(|py| {
let read = get_data(&data, py);
if read.len() == SIZE {
// extend is still not done => wait even more
return;
}
with_critical_section(&read, || {
// SAFETY: we are in a critical section
// This is the whole point of the test: make sure that a
// critical section is sufficient to ensure that the data
// read is consistent.
unsafe {
let bytes = read.as_bytes();
assert!(bytes.iter().rev().take(50).all(|v| *v == DATA_VALUE
&& bytes.iter().take(50).all(|v| *v == DATA_VALUE)));
}
});
})
}
};

// write unrelated data to the memory for extra stress
let worker3 = || {
while running.load(Ordering::Relaxed) {
Python::attach(|py| {
let arrays = (0..5)
.map(|_| {
make_byte_array(py, SIZE, GARBAGE_VALUE);
})
.collect::<Vec<_>>();
drop(arrays);
py.run(c_str!(r#"import gc; gc.collect()"#), None, None)
.unwrap();
});
}
};

thread::scope(|s| {
let mut handle1 = Some(s.spawn(worker1));
let mut handle2 = Some(s.spawn(worker2));
let mut handle3 = Some(s.spawn(worker3));
let mut handles = [&mut handle1, &mut handle2, &mut handle3];

let t0 = std::time::Instant::now();
while t0.elapsed() < Duration::from_secs(10) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to reduce this to 0.5 secs (as long as that's still sufficient iterations), else this week have a noticeable impact on CI times.

Copy link
Contributor Author

@robsdedude robsdedude Jul 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is good point. The problem with race conditions is that they're inherently non-deterministic. So there is no definitive number of "sufficient iterations". This is 100% a trade-off between CI time and how likely you want the test to be catching the issue. Further, reducing this number to the smallest possible value on my machine such that it still satisfy my feeling of sufficiently accurate is absolutely no guarantee that this number will be a good fit for other machines (e.g., the CI runners).

That being said, on my machine it usually takes 1-2 seconds for it to fail against CPython 3.13t. But that number is ofc. also depending on how fast a machine can actually run the iterations. So maybe I should just put a cap on the number of iterations an call it a day. Again, on my machine, each iterations amounts to just shy of 0.2 seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So after a fair bit of fiddling, I saw on pass that took 17 rounds for the bug to occur. So I pinned the max number of iterations to 25 (adding some margin of error). This, however, still amounts to roughly 5 seconds. 🤷 I don't think this can be reasonably reduced without accepting that the test might not actually test what it sets out to test :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it reproduce with smaller test data? 200 MiB seems like a lot.

Copy link
Contributor Author

@robsdedude robsdedude Jul 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same tradeoff. The smaller the size, the faster the extend, the lower the chance of intercepting it and catching the race condition. I'll try to see if I can further lower the size without lowering the accuracy too much. Again: all of it will be tailored to my specific machine. I have no idea how well the numbers I'll end with will generalize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as there's enough iterations that eventually a CI run would trigger it (doesn't need to be every CI run) then I'm happy. If the chance that CI running all month would have zero chance of triggering, then I'd think it wasn't enough :)

for handle in &mut handles {
if handle
.as_ref()
.map(ScopedJoinHandle::is_finished)
.unwrap_or(false)
{
handle
.take()
.unwrap()
.join()
.inspect_err(|_| running.store(false, Ordering::Relaxed))
.unwrap()
}
}
if handles.iter().all(|handle| handle.is_none()) {
break;
}
}
running.store(false, Ordering::Relaxed);
for handle in &mut handles {
if let Some(handle) = handle.take() {
handle.join().unwrap()
}
}
});
}
}
Loading