Skip to content

Commit 417b70c

Browse files
committed
pythongh-120321: Make gi_yieldfrom thread-safe in free-threading build
Add a FRAME_SUSPENDED_YIELD_FROM_LOCKED state that acts as a brief lock, preventing other threads from transitioning the frame state while gen_getyieldfrom reads the yield-from object off the stack.
1 parent 6ea3f8c commit 417b70c

File tree

9 files changed

+107
-18
lines changed

9 files changed

+107
-18
lines changed

Include/internal/pycore_frame.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,16 @@ extern PyFrameObject* _PyFrame_New_NoTrack(PyCodeObject *code);
4444
/* other API */
4545

4646
typedef enum _framestate {
47-
FRAME_CREATED = -3,
48-
FRAME_SUSPENDED = -2,
49-
FRAME_SUSPENDED_YIELD_FROM = -1,
47+
FRAME_CREATED = -4,
48+
FRAME_SUSPENDED = -3,
49+
FRAME_SUSPENDED_YIELD_FROM = -2,
50+
FRAME_SUSPENDED_YIELD_FROM_LOCKED = -1,
5051
FRAME_EXECUTING = 0,
5152
FRAME_COMPLETED = 1,
5253
FRAME_CLEARED = 4
5354
} PyFrameState;
5455

55-
#define FRAME_STATE_SUSPENDED(S) ((S) == FRAME_SUSPENDED || (S) == FRAME_SUSPENDED_YIELD_FROM)
56+
#define FRAME_STATE_SUSPENDED(S) ((S) >= FRAME_SUSPENDED && (S) <= FRAME_SUSPENDED_YIELD_FROM_LOCKED)
5657
#define FRAME_STATE_FINISHED(S) ((S) >= FRAME_COMPLETED)
5758

5859
#ifdef __cplusplus

Include/internal/pycore_lock.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ PyMutex_LockFlags(PyMutex *m, _PyLockFlags flags)
7070
// error messages) otherwise returns 0.
7171
extern int _PyMutex_TryUnlock(PyMutex *m);
7272

73+
// Yield the processor to other threads (e.g., sched_yield).
74+
extern void _Py_yield(void);
75+
7376

7477
// PyEvent is a one-time event notification
7578
typedef struct {

Lib/test/support/threading_helper.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,21 +250,32 @@ def requires_working_threading(*, module=False):
250250
return unittest.skipUnless(can_start_thread, msg)
251251

252252

253-
def run_concurrently(worker_func, nthreads, args=(), kwargs={}):
253+
def run_concurrently(worker_func, nthreads=None, args=(), kwargs={}):
254254
"""
255-
Run the worker function concurrently in multiple threads.
255+
Run the worker function(s) concurrently in multiple threads.
256+
257+
If `worker_func` is a single callable, it is used for all threads.
258+
If it is a list of callables, each callable is used for one thread.
256259
"""
260+
from collections.abc import Iterable
261+
262+
if nthreads is None:
263+
nthreads = len(worker_func)
264+
if not isinstance(worker_func, Iterable):
265+
worker_func = [worker_func] * nthreads
266+
assert len(worker_func) == nthreads
267+
257268
barrier = threading.Barrier(nthreads)
258269

259-
def wrapper_func(*args, **kwargs):
270+
def wrapper_func(func, *args, **kwargs):
260271
# Wait for all threads to reach this point before proceeding.
261272
barrier.wait()
262-
worker_func(*args, **kwargs)
273+
func(*args, **kwargs)
263274

264275
with catch_threading_exception() as cm:
265276
workers = [
266-
threading.Thread(target=wrapper_func, args=args, kwargs=kwargs)
267-
for _ in range(nthreads)
277+
threading.Thread(target=wrapper_func, args=(func, *args), kwargs=kwargs)
278+
for func in worker_func
268279
]
269280
with start_threads(workers):
270281
pass

Lib/test/test_free_threading/test_generators.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import concurrent.futures
2+
import itertools
3+
import threading
24
import unittest
35
from threading import Barrier
46
from unittest import TestCase
@@ -120,3 +122,38 @@ def drive_generator(g):
120122

121123
g = gen()
122124
threading_helper.run_concurrently(drive_generator, self.NUM_THREADS, args=(g,))
125+
126+
def test_concurrent_gi_yieldfrom(self):
127+
def gen_yield_from():
128+
yield from itertools.count()
129+
130+
g = gen_yield_from()
131+
next(g) # Put in FRAME_SUSPENDED_YIELD_FROM state
132+
133+
def read_yieldfrom(gen):
134+
for _ in range(10000):
135+
self.assertIsNotNone(gen.gi_yieldfrom)
136+
137+
threading_helper.run_concurrently(read_yieldfrom, self.NUM_THREADS, args=(g,))
138+
139+
def test_gi_yieldfrom_close_race(self):
140+
def gen_yield_from():
141+
yield from itertools.count()
142+
143+
g = gen_yield_from()
144+
next(g)
145+
146+
done = threading.Event()
147+
148+
def reader():
149+
while not done.is_set():
150+
g.gi_yieldfrom
151+
152+
def closer():
153+
try:
154+
g.close()
155+
except ValueError:
156+
pass
157+
done.set()
158+
159+
threading_helper.run_concurrently([reader, closer])
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Made :attr:`~generator.gi_yieldfrom` thread-safe in the free-threading build
2+
by using a lightweight lock on the frame state.

Objects/genobject.c

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "pycore_gc.h" // _PyGC_CLEAR_FINALIZED()
1111
#include "pycore_genobject.h" // _PyGen_SetStopIterationValue()
1212
#include "pycore_interpframe.h" // _PyFrame_GetCode()
13+
#include "pycore_lock.h" // _Py_yield()
1314
#include "pycore_modsupport.h" // _PyArg_CheckPositional()
1415
#include "pycore_object.h" // _PyObject_GC_UNTRACK()
1516
#include "pycore_opcode_utils.h" // RESUME_AFTER_YIELD_FROM
@@ -44,6 +45,18 @@ static PyObject* async_gen_athrow_new(PyAsyncGenObject *, PyObject *);
4445
((gen)->gi_frame_state = (state), true)
4546
#endif
4647

48+
// Wait for any in-progress gi_yieldfrom read to complete.
49+
static inline void
50+
gen_yield_from_lock_wait(PyGenObject *gen, int8_t *frame_state)
51+
{
52+
#ifdef Py_GIL_DISABLED
53+
while (*frame_state == FRAME_SUSPENDED_YIELD_FROM_LOCKED) {
54+
_Py_yield();
55+
*frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
56+
}
57+
#endif
58+
}
59+
4760

4861
static const char *NON_INIT_CORO_MSG = "can't send non-None value to a "
4962
"just-started coroutine";
@@ -318,6 +331,8 @@ gen_send_ex(PyGenObject *gen, PyObject *arg, PyObject **presult)
318331
*presult = NULL;
319332
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
320333
do {
334+
gen_yield_from_lock_wait(gen, &frame_state);
335+
321336
if (frame_state == FRAME_CREATED && arg && arg != Py_None) {
322337
const char *msg = "can't send non-None value to a "
323338
"just-started generator";
@@ -452,6 +467,8 @@ gen_close(PyObject *self, PyObject *args)
452467

453468
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
454469
do {
470+
gen_yield_from_lock_wait(gen, &frame_state);
471+
455472
if (frame_state == FRAME_CREATED) {
456473
// && (1) to avoid -Wunreachable-code warning on Clang
457474
if (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_CLEARED) && (1)) {
@@ -470,9 +487,7 @@ gen_close(PyObject *self, PyObject *args)
470487
return NULL;
471488
}
472489

473-
assert(frame_state == FRAME_SUSPENDED_YIELD_FROM ||
474-
frame_state == FRAME_SUSPENDED);
475-
490+
assert(FRAME_STATE_SUSPENDED(frame_state));
476491
} while (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_EXECUTING));
477492

478493
int err = 0;
@@ -614,6 +629,8 @@ _gen_throw(PyGenObject *gen, int close_on_genexit,
614629
{
615630
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
616631
do {
632+
gen_yield_from_lock_wait(gen, &frame_state);
633+
617634
if (frame_state == FRAME_EXECUTING) {
618635
gen_raise_already_executing_error(gen);
619636
return NULL;
@@ -876,12 +893,25 @@ static PyObject *
876893
gen_getyieldfrom(PyObject *self, void *Py_UNUSED(ignored))
877894
{
878895
PyGenObject *gen = _PyGen_CAST(self);
879-
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
896+
#ifdef Py_GIL_DISABLED
897+
int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
898+
do {
899+
gen_yield_from_lock_wait(gen, &frame_state);
900+
if (frame_state != FRAME_SUSPENDED_YIELD_FROM) {
901+
Py_RETURN_NONE;
902+
}
903+
} while (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_SUSPENDED_YIELD_FROM_LOCKED));
904+
905+
PyObject *result = PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe));
906+
_Py_atomic_store_int8_release(&gen->gi_frame_state, FRAME_SUSPENDED_YIELD_FROM);
907+
return result;
908+
#else
909+
int8_t frame_state = gen->gi_frame_state;
880910
if (frame_state != FRAME_SUSPENDED_YIELD_FROM) {
881911
Py_RETURN_NONE;
882912
}
883-
// TODO: still not thread-safe with free threading
884913
return PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe));
914+
#endif
885915
}
886916

887917

Python/ceval.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3391,7 +3391,9 @@ _PyEval_GetAwaitable(PyObject *iterable, int oparg)
33913391
else if (PyCoro_CheckExact(iter)) {
33923392
PyCoroObject *coro = (PyCoroObject *)iter;
33933393
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(coro->cr_frame_state);
3394-
if (frame_state == FRAME_SUSPENDED_YIELD_FROM) {
3394+
if (frame_state == FRAME_SUSPENDED_YIELD_FROM ||
3395+
frame_state == FRAME_SUSPENDED_YIELD_FROM_LOCKED)
3396+
{
33953397
/* `iter` is a coroutine object that is being awaited. */
33963398
Py_CLEAR(iter);
33973399
_PyErr_SetString(PyThreadState_GET(), PyExc_RuntimeError,

Python/ceval_macros.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,19 +522,22 @@ gen_try_set_executing(PyGenObject *gen)
522522
#ifdef Py_GIL_DISABLED
523523
if (!_PyObject_IsUniquelyReferenced((PyObject *)gen)) {
524524
int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
525-
while (frame_state < FRAME_EXECUTING) {
525+
while (frame_state < FRAME_SUSPENDED_YIELD_FROM_LOCKED) {
526526
if (_Py_atomic_compare_exchange_int8(&gen->gi_frame_state,
527527
&frame_state,
528528
FRAME_EXECUTING)) {
529529
return true;
530530
}
531531
}
532+
// NB: We return false for FRAME_SUSPENDED_YIELD_FROM_LOCKED as well.
533+
// That case is rare enough that we can just handle it in the deopt.
532534
return false;
533535
}
534536
#endif
535537
// Use faster non-atomic modifications in the GIL-enabled build and when
536538
// the object is uniquely referenced in the free-threaded build.
537539
if (gen->gi_frame_state < FRAME_EXECUTING) {
540+
assert(gen->gi_frame_state != FRAME_SUSPENDED_YIELD_FROM_LOCKED);
538541
gen->gi_frame_state = FRAME_EXECUTING;
539542
return true;
540543
}

Python/lock.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ struct mutex_entry {
4040
int handed_off;
4141
};
4242

43-
static void
43+
void
4444
_Py_yield(void)
4545
{
4646
#ifdef MS_WINDOWS

0 commit comments

Comments
 (0)