diff --git a/Include/cpython/pystate.h b/Include/cpython/pystate.h index bd9d8aaefe5400..ac8798ff6129a0 100644 --- a/Include/cpython/pystate.h +++ b/Include/cpython/pystate.h @@ -107,6 +107,7 @@ struct _ts { # define _PyThreadState_WHENCE_THREADING 3 # define _PyThreadState_WHENCE_GILSTATE 4 # define _PyThreadState_WHENCE_EXEC 5 +# define _PyThreadState_WHENCE_THREADING_DAEMON 6 #endif /* Currently holds the GIL. Must be its own field to avoid data races */ diff --git a/Lib/test/test_atexit.py b/Lib/test/test_atexit.py index eb01da6e88a8bc..66142a108d5d93 100644 --- a/Lib/test/test_atexit.py +++ b/Lib/test/test_atexit.py @@ -79,6 +79,62 @@ def thready(): # want them to affect the rest of the tests. script_helper.assert_python_ok("-c", textwrap.dedent(source)) + @threading_helper.requires_working_threading() + def test_thread_created_in_atexit(self): + source = """if True: + import atexit + import threading + import time + + + def run(): + print(24) + time.sleep(1) + print(42) + + @atexit.register + def start_thread(): + threading.Thread(target=run).start() + """ + return_code, stdout, stderr = script_helper.assert_python_ok("-c", source) + self.assertEqual(return_code, 0) + self.assertEqual(stdout, f"24{os.linesep}42{os.linesep}".encode("utf-8")) + self.assertEqual(stderr, b"") + + @threading_helper.requires_working_threading() + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") + def test_thread_created_in_atexit_subinterpreter(self): + try: + from concurrent import interpreters + except ImportError: + self.skipTest("subinterpreters are not available") + + read, write = os.pipe() + source = f"""if True: + import atexit + import threading + import time + import os + + def run(): + os.write({write}, b'spanish') + time.sleep(1) + os.write({write}, b'inquisition') + + @atexit.register + def start_thread(): + threading.Thread(target=run).start() + """ + interp = interpreters.create() + try: + interp.exec(source) + + # Close the interpreter to invoke atexit callbacks + interp.close() + self.assertEqual(os.read(read, 100), b"spanishinquisition") + finally: + os.close(read) + os.close(write) @support.cpython_only class SubinterpreterTest(unittest.TestCase): diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index ef950f5df04ad3..fe1287167d3e74 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -22,6 +22,7 @@ from test import support from test.support import MISSING_C_DOCSTRINGS from test.support import import_helper +from test.support import script_helper from test.support import threading_helper from test.support import warnings_helper from test.support import requires_limited_api @@ -1641,6 +1642,36 @@ def subthread(): self.assertEqual(actual, int(interpid)) + @threading_helper.requires_working_threading() + def test_pending_call_creates_thread(self): + source = """ + import _testinternalcapi + import threading + import time + + + def output(): + print(24) + time.sleep(1) + print(42) + + + def callback(): + threading.Thread(target=output).start() + + + def create_pending_call(): + time.sleep(1) + _testinternalcapi.simple_pending_call(callback) + + + threading.Thread(target=create_pending_call).start() + """ + return_code, stdout, stderr = script_helper.assert_python_ok('-c', textwrap.dedent(source)) + self.assertEqual(return_code, 0) + self.assertEqual(stdout, f"24{os.linesep}42{os.linesep}".encode("utf-8")) + self.assertEqual(stderr, b"") + class SubinterpreterTest(unittest.TestCase): @@ -1949,6 +1980,41 @@ def test_module_state_shared_in_global(self): subinterp_attr_id = os.read(r, 100) self.assertEqual(main_attr_id, subinterp_attr_id) + @threading_helper.requires_working_threading() + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") + @requires_subinterpreters + def test_pending_call_creates_thread_subinterpreter(self): + interpreters = import_helper.import_module("concurrent.interpreters") + r, w = os.pipe() + source = f"""if True: + import _testinternalcapi + import threading + import time + import os + + + def output(): + time.sleep(1) + os.write({w}, b"x") + + + def callback(): + threading.Thread(target=output).start() + + + def create_pending_call(): + time.sleep(1) + _testinternalcapi.simple_pending_call(callback) + + + threading.Thread(target=create_pending_call).start() + """ + interp = interpreters.create() + interp.exec(source) + interp.close() + data = os.read(r, 1) + self.assertEqual(data, b"x") + @requires_subinterpreters class InterpreterConfigTests(unittest.TestCase): diff --git a/Lib/threading.py b/Lib/threading.py index b3e24fe7ddaa0c..fac88d8b20c60d 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -1557,8 +1557,9 @@ def _shutdown(): # normally - that won't happen until the interpreter is nearly dead. So # mark it done here. if _main_thread._os_thread_handle.is_done() and _is_main_interpreter(): - # _shutdown() was already called - return + # _shutdown() was already called, but threads might have started + # in the meantime. + return _thread_shutdown() global _SHUTTING_DOWN _SHUTTING_DOWN = True diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-06-26-18-44-34.gh-issue-136003.sln51d.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-06-26-18-44-34.gh-issue-136003.sln51d.rst new file mode 100644 index 00000000000000..a1344d2264f665 --- /dev/null +++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-06-26-18-44-34.gh-issue-136003.sln51d.rst @@ -0,0 +1,3 @@ +Fix :class:`threading.Thread` objects becoming incorrectly daemon when +created from an :mod:`atexit` callback or a pending call +(:c:func:`Py_AddPendingCall`). diff --git a/Modules/_testinternalcapi.c b/Modules/_testinternalcapi.c index 243c7346576fc6..7aa63f913af335 100644 --- a/Modules/_testinternalcapi.c +++ b/Modules/_testinternalcapi.c @@ -2376,6 +2376,16 @@ emscripten_set_up_async_input_device(PyObject *self, PyObject *Py_UNUSED(ignored } #endif +static PyObject * +simple_pending_call(PyObject *self, PyObject *callable) +{ + if (_PyEval_AddPendingCall(_PyInterpreterState_GET(), _pending_callback, Py_NewRef(callable), 0) < 0) { + return NULL; + } + + Py_RETURN_NONE; +} + static PyMethodDef module_functions[] = { {"get_configs", get_configs, METH_NOARGS}, {"get_recursion_depth", get_recursion_depth, METH_NOARGS}, @@ -2481,6 +2491,7 @@ static PyMethodDef module_functions[] = { #ifdef __EMSCRIPTEN__ {"emscripten_set_up_async_input_device", emscripten_set_up_async_input_device, METH_NOARGS}, #endif + {"simple_pending_call", simple_pending_call, METH_O}, {NULL, NULL} /* sentinel */ }; diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index c6d07b1360711c..070732aba860b2 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -429,7 +429,7 @@ force_done(void *arg) static int ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args, - PyObject *kwargs) + PyObject *kwargs, int daemon) { // Mark the handle as starting to prevent any other threads from doing so PyMutex_Lock(&self->mutex); @@ -453,7 +453,8 @@ ThreadHandle_start(ThreadHandle *self, PyObject *func, PyObject *args, goto start_failed; } PyInterpreterState *interp = _PyInterpreterState_GET(); - boot->tstate = _PyThreadState_New(interp, _PyThreadState_WHENCE_THREADING); + uint8_t whence = daemon ? _PyThreadState_WHENCE_THREADING_DAEMON : _PyThreadState_WHENCE_THREADING; + boot->tstate = _PyThreadState_New(interp, whence); if (boot->tstate == NULL) { PyMem_RawFree(boot); if (!PyErr_Occurred()) { @@ -1916,7 +1917,7 @@ do_start_new_thread(thread_module_state *state, PyObject *func, PyObject *args, add_to_shutdown_handles(state, handle); } - if (ThreadHandle_start(handle, func, args, kwargs) < 0) { + if (ThreadHandle_start(handle, func, args, kwargs, daemon) < 0) { if (!daemon) { remove_from_shutdown_handles(handle); } diff --git a/Python/pylifecycle.c b/Python/pylifecycle.c index 8920784a7b7eac..b930e2e2e43e33 100644 --- a/Python/pylifecycle.c +++ b/Python/pylifecycle.c @@ -2013,6 +2013,133 @@ resolve_final_tstate(_PyRuntimeState *runtime) return main_tstate; } +#ifdef Py_GIL_DISABLED +#define ASSERT_WORLD_STOPPED(interp) assert(interp->runtime->stoptheworld.world_stopped) +#else +#define ASSERT_WORLD_STOPPED(interp) +#endif + +static int +interp_has_threads(PyInterpreterState *interp) +{ + /* This needs to check for non-daemon threads only, otherwise we get stuck + * in an infinite loop. */ + assert(interp != NULL); + ASSERT_WORLD_STOPPED(interp); + assert(interp->threads.head != NULL); + if (interp->threads.head->next == NULL) { + // No other threads active, easy way out. + return 0; + } + + // We don't have to worry about locking this because the + // world is stopped. + _Py_FOR_EACH_TSTATE_UNLOCKED(interp, tstate) { + if (tstate->_whence == _PyThreadState_WHENCE_THREADING) { + return 1; + } + } + + return 0; +} + +static int +interp_has_pending_calls(PyInterpreterState *interp) +{ + assert(interp != NULL); + ASSERT_WORLD_STOPPED(interp); + return interp->ceval.pending.npending != 0; +} + +static int +interp_has_atexit_callbacks(PyInterpreterState *interp) +{ + assert(interp != NULL); + assert(interp->atexit.callbacks != NULL); + ASSERT_WORLD_STOPPED(interp); + assert(PyList_CheckExact(interp->atexit.callbacks)); + return PyList_GET_SIZE(interp->atexit.callbacks) != 0; +} + +static int +runtime_has_subinterpreters(_PyRuntimeState *runtime) +{ + assert(runtime != NULL); + HEAD_LOCK(runtime); + PyInterpreterState *interp = runtime->interpreters.head; + HEAD_UNLOCK(runtime); + return interp->next != NULL; +} + +static void +make_pre_finalization_calls(PyThreadState *tstate, int subinterpreters) +{ + assert(tstate != NULL); + PyInterpreterState *interp = tstate->interp; + /* Each of these functions can start one another, e.g. a pending call + * could start a thread or vice versa. To ensure that we properly clean + * call everything, we run these in a loop until none of them run anything. */ + for (;;) { + assert(!interp->runtime->stoptheworld.world_stopped); + + // Wrap up existing "threading"-module-created, non-daemon threads. + wait_for_thread_shutdown(tstate); + + // Make any remaining pending calls. + _Py_FinishPendingCalls(tstate); + + /* The interpreter is still entirely intact at this point, and the + * exit funcs may be relying on that. In particular, if some thread + * or exit func is still waiting to do an import, the import machinery + * expects Py_IsInitialized() to return true. So don't say the + * runtime is uninitialized until after the exit funcs have run. + * Note that Threading.py uses an exit func to do a join on all the + * threads created thru it, so this also protects pending imports in + * the threads created via Threading. + */ + + _PyAtExit_Call(tstate->interp); + + if (subinterpreters) { + /* Clean up any lingering subinterpreters. + + Two preconditions need to be met here: + + - This has to happen before _PyRuntimeState_SetFinalizing is + called, or else threads might get prematurely blocked. + - The world must not be stopped, as finalizers can run. + */ + finalize_subinterpreters(); + } + + + /* Stop the world to prevent other threads from creating threads or + * atexit callbacks. On the default build, this is simply locked by + * the GIL. For pending calls, we acquire the dedicated mutex, because + * Py_AddPendingCall() can be called without an attached thread state. + */ + + PyMutex_Lock(&interp->ceval.pending.mutex); + // XXX Why does _PyThreadState_DeleteList() rely on all interpreters + // being stopped? + _PyEval_StopTheWorldAll(interp->runtime); + int has_subinterpreters = subinterpreters + ? runtime_has_subinterpreters(interp->runtime) + : 0; + int should_continue = (interp_has_threads(interp) + || interp_has_atexit_callbacks(interp) + || interp_has_pending_calls(interp) + || has_subinterpreters); + if (!should_continue) { + break; + } + _PyEval_StartTheWorldAll(interp->runtime); + PyMutex_Unlock(&interp->ceval.pending.mutex); + } + assert(PyMutex_IsLocked(&interp->ceval.pending.mutex)); + ASSERT_WORLD_STOPPED(interp); +} + static int _Py_Finalize(_PyRuntimeState *runtime) { @@ -2029,33 +2156,8 @@ _Py_Finalize(_PyRuntimeState *runtime) // Block some operations. tstate->interp->finalizing = 1; - // Wrap up existing "threading"-module-created, non-daemon threads. - wait_for_thread_shutdown(tstate); - - // Make any remaining pending calls. - _Py_FinishPendingCalls(tstate); - - /* The interpreter is still entirely intact at this point, and the - * exit funcs may be relying on that. In particular, if some thread - * or exit func is still waiting to do an import, the import machinery - * expects Py_IsInitialized() to return true. So don't say the - * runtime is uninitialized until after the exit funcs have run. - * Note that Threading.py uses an exit func to do a join on all the - * threads created thru it, so this also protects pending imports in - * the threads created via Threading. - */ - - _PyAtExit_Call(tstate->interp); - - /* Clean up any lingering subinterpreters. - - Two preconditions need to be met here: - - - This has to happen before _PyRuntimeState_SetFinalizing is - called, or else threads might get prematurely blocked. - - The world must not be stopped, as finalizers can run. - */ - finalize_subinterpreters(); + // This call stops the world and takes the pending calls lock. + make_pre_finalization_calls(tstate, /*subinterpreters=*/1); assert(_PyThreadState_GET() == tstate); @@ -2073,7 +2175,7 @@ _Py_Finalize(_PyRuntimeState *runtime) #endif /* Ensure that remaining threads are detached */ - _PyEval_StopTheWorldAll(runtime); + ASSERT_WORLD_STOPPED(tstate->interp); /* Remaining daemon threads will be trapped in PyThread_hang_thread when they attempt to take the GIL (ex: PyEval_RestoreThread()). */ @@ -2094,6 +2196,7 @@ _Py_Finalize(_PyRuntimeState *runtime) _PyThreadState_SetShuttingDown(p); } _PyEval_StartTheWorldAll(runtime); + PyMutex_Unlock(&tstate->interp->ceval.pending.mutex); /* Clear frames of other threads to call objects destructors. Destructors will be called in the current Python thread. Since @@ -2449,15 +2552,10 @@ Py_EndInterpreter(PyThreadState *tstate) } interp->finalizing = 1; - // Wrap up existing "threading"-module-created, non-daemon threads. - wait_for_thread_shutdown(tstate); - - // Make any remaining pending calls. - _Py_FinishPendingCalls(tstate); + // This call stops the world and takes the pending calls lock. + make_pre_finalization_calls(tstate, /*subinterpreters=*/0); - _PyAtExit_Call(tstate->interp); - _PyRuntimeState *runtime = interp->runtime; - _PyEval_StopTheWorldAll(runtime); + ASSERT_WORLD_STOPPED(interp); /* Remaining daemon threads will automatically exit when they attempt to take the GIL (ex: PyEval_RestoreThread()). */ _PyInterpreterState_SetFinalizing(interp, tstate); @@ -2467,7 +2565,8 @@ Py_EndInterpreter(PyThreadState *tstate) _PyThreadState_SetShuttingDown(p); } - _PyEval_StartTheWorldAll(runtime); + _PyEval_StartTheWorldAll(interp->runtime); + PyMutex_Unlock(&interp->ceval.pending.mutex); _PyThreadState_DeleteList(list, /*is_after_fork=*/0); // XXX Call something like _PyImport_Disable() here? diff --git a/Python/pystate.c b/Python/pystate.c index f43989b0181b1a..29c713dccc9fe8 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1461,7 +1461,7 @@ init_threadstate(_PyThreadStateImpl *_tstate, assert(tstate->prev == NULL); assert(tstate->_whence == _PyThreadState_WHENCE_NOTSET); - assert(whence >= 0 && whence <= _PyThreadState_WHENCE_EXEC); + assert(whence >= 0 && whence <= _PyThreadState_WHENCE_THREADING_DAEMON); tstate->_whence = whence; assert(id > 0);