Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ struct _is {
uint64_t next_unique_id;
/* The linked list of threads, newest first. */
PyThreadState *head;
struct {
PyMutex mutex;
_PyThreadStateImpl *head;
} freelist;
/* The thread currently executing in the __main__ module, if any. */
PyThreadState *main;
/* Used in Modules/_threadmodule.c. */
Expand Down Expand Up @@ -278,9 +282,10 @@ struct _is {
struct _Py_interp_cached_objects cached_objects;
struct _Py_interp_static_objects static_objects;

Py_ssize_t _interactive_src_count;

/* the initial PyInterpreterState.threads.head */
_PyThreadStateImpl _initial_thread;
Py_ssize_t _interactive_src_count;
};


Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_runtime_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ extern PyTypeObject _PyExc_MemoryError;
{ \
.id_refcount = -1, \
._whence = _PyInterpreterState_WHENCE_NOTSET, \
.threads = { \
.freelist = { \
.head = &(INTERP)._initial_thread, \
}, \
}, \
.imports = IMPORTS_INIT, \
.ceval = { \
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
Expand Down
17 changes: 17 additions & 0 deletions Lib/test/test_interpreters/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ def task():
with threading_helper.start_threads(threads):
pass

@support.requires_resource('cpu')
def test_many_threads_running_interp_in_other_interp(self):
interp = interpreters.create()

script = f"""if True:
import _interpreters
_interpreters.run_string({interp.id}, '1')
"""

def run():
interp = interpreters.create()
interp.exec(script)

threads = (threading.Thread(target=run) for _ in range(200))
with threading_helper.start_threads(threads):
pass


if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
Expand Down
107 changes: 59 additions & 48 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ _PyRuntimeState_ReInitThreads(_PyRuntimeState *runtime)
for (PyInterpreterState *interp = runtime->interpreters.head;
interp != NULL; interp = interp->next)
{
_PyMutex_at_fork_reinit(&interp->threads.freelist.mutex);
for (int i = 0; i < NUM_WEAKREF_LIST_LOCKS; i++) {
_PyMutex_at_fork_reinit(&interp->weakref_locks[i]);
}
Expand Down Expand Up @@ -629,6 +630,8 @@ init_interpreter(PyInterpreterState *interp,
assert(next != NULL || (interp == runtime->interpreters.main));
interp->next = next;

interp->threads.freelist.head = &interp->_initial_thread;

// We would call _PyObject_InitState() at this point
// if interp->feature_flags were alredy set.

Expand Down Expand Up @@ -766,7 +769,6 @@ PyInterpreterState_New(void)
return interp;
}


static void
interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
{
Expand Down Expand Up @@ -910,6 +912,9 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
// XXX Once we have one allocator per interpreter (i.e.
// per-interpreter GC) we must ensure that all of the interpreter's
// objects have been cleaned up at the point.

// We could clear interp->threads.freelist here
// if it held more than just the initial thread state.
}


Expand Down Expand Up @@ -1386,28 +1391,62 @@ allocate_chunk(int size_in_bytes, _PyStackChunk* previous)
return res;
}

static _PyThreadStateImpl *
alloc_threadstate(void)
static void
reset_threadstate(_PyThreadStateImpl *tstate)
{
return PyMem_RawCalloc(1, sizeof(_PyThreadStateImpl));
// Set to _PyThreadState_INIT directly?
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
}

#define LOCK_THREADS_FREELIST(interp) \
PyMutex_LockFlags(&(interp)->threads.freelist.mutex, _Py_LOCK_DONT_DETACH)
#define UNLOCK_THREADS_FREELIST(interp) \
PyMutex_Unlock(&(interp)->threads.freelist.mutex)

static _PyThreadStateImpl *
alloc_threadstate(PyInterpreterState *interp)
{
// Try the freelist first.
_PyThreadStateImpl *tstate = NULL;
LOCK_THREADS_FREELIST(interp);
if (interp->threads.freelist.head != NULL) {
tstate = interp->threads.freelist.head;
interp->threads.freelist.head =
(_PyThreadStateImpl *)tstate->base.next;
}
UNLOCK_THREADS_FREELIST(interp);
// Fall back to the allocator.
if (tstate == NULL) {
tstate = PyMem_RawCalloc(1, sizeof(_PyThreadStateImpl));
reset_threadstate(tstate);
}
return tstate;
}

static void
free_threadstate(_PyThreadStateImpl *tstate)
{
PyInterpreterState *interp = tstate->base.interp;
// The initial thread state of the interpreter is allocated
// as part of the interpreter state so should not be freed.
if (tstate == &tstate->base.interp->_initial_thread) {
// Restore to _PyThreadState_INIT.
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
if (tstate == &interp->_initial_thread) {
// Add it to the freelist.
reset_threadstate(tstate);
LOCK_THREADS_FREELIST(interp);
tstate->base.next = (PyThreadState *)interp->threads.freelist.head;
interp->threads.freelist.head = tstate;
UNLOCK_THREADS_FREELIST(interp);
}
else {
PyMem_RawFree(tstate);
}
}

#undef LOCK_THREADS_FREELIST
#undef UNLOCK_THREADS_FREELIST

/* Get the thread state to a minimal consistent state.
Further init happens in pylifecycle.c before it can be used.
All fields not initialized here are expected to be zeroed out,
Expand Down Expand Up @@ -1492,66 +1531,38 @@ add_threadstate(PyInterpreterState *interp, PyThreadState *tstate,
static PyThreadState *
new_threadstate(PyInterpreterState *interp, int whence)
{
_PyThreadStateImpl *tstate;
_PyRuntimeState *runtime = interp->runtime;
// We don't need to allocate a thread state for the main interpreter
// (the common case), but doing it later for the other case revealed a
// reentrancy problem (deadlock). So for now we always allocate before
// taking the interpreters lock. See GH-96071.
_PyThreadStateImpl *new_tstate = alloc_threadstate();
int used_newtstate;
if (new_tstate == NULL) {
// Allocate the thread state.
_PyThreadStateImpl *tstate = alloc_threadstate(interp);
if (tstate == NULL) {
return NULL;
}

#ifdef Py_GIL_DISABLED
Py_ssize_t qsbr_idx = _Py_qsbr_reserve(interp);
if (qsbr_idx < 0) {
PyMem_RawFree(new_tstate);
free_threadstate(tstate);
return NULL;
}
int32_t tlbc_idx = _Py_ReserveTLBCIndex(interp);
if (tlbc_idx < 0) {
PyMem_RawFree(new_tstate);
free_threadstate(tstate);
return NULL;
}
#endif

/* We serialize concurrent creation to protect global state. */
HEAD_LOCK(runtime);
HEAD_LOCK(interp->runtime);

// Initialize the new thread state.
interp->threads.next_unique_id += 1;
uint64_t id = interp->threads.next_unique_id;
init_threadstate(tstate, interp, id, whence);

// Allocate the thread state and add it to the interpreter.
// Add the new thread state to the interpreter.
PyThreadState *old_head = interp->threads.head;
if (old_head == NULL) {
// It's the interpreter's initial thread state.
used_newtstate = 0;
tstate = &interp->_initial_thread;
}
// XXX Re-use interp->_initial_thread if not in use?
else {
// Every valid interpreter must have at least one thread.
assert(id > 1);
assert(old_head->prev == NULL);
used_newtstate = 1;
tstate = new_tstate;
// Set to _PyThreadState_INIT.
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
}

init_threadstate(tstate, interp, id, whence);
add_threadstate(interp, (PyThreadState *)tstate, old_head);

HEAD_UNLOCK(runtime);
if (!used_newtstate) {
// Must be called with lock unlocked to avoid re-entrancy deadlock.
PyMem_RawFree(new_tstate);
}
else {
}
HEAD_UNLOCK(interp->runtime);

#ifdef Py_GIL_DISABLED
// Must be called with lock unlocked to avoid lock ordering deadlocks.
Expand Down
Loading