Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ struct _is {
uint64_t next_unique_id;
/* The linked list of threads, newest first. */
PyThreadState *head;
_PyThreadStateImpl *preallocated;
/* The thread currently executing in the __main__ module, if any. */
PyThreadState *main;
/* Used in Modules/_threadmodule.c. */
Expand Down Expand Up @@ -278,9 +279,10 @@ struct _is {
struct _Py_interp_cached_objects cached_objects;
struct _Py_interp_static_objects static_objects;

Py_ssize_t _interactive_src_count;

/* the initial PyInterpreterState.threads.head */
_PyThreadStateImpl _initial_thread;
Py_ssize_t _interactive_src_count;
};


Expand Down
3 changes: 3 additions & 0 deletions Include/internal/pycore_runtime_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ extern PyTypeObject _PyExc_MemoryError;
{ \
.id_refcount = -1, \
._whence = _PyInterpreterState_WHENCE_NOTSET, \
.threads = { \
.preallocated = &(INTERP)._initial_thread, \
}, \
.imports = IMPORTS_INIT, \
.ceval = { \
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
Expand Down
30 changes: 30 additions & 0 deletions Lib/test/test_interpreters/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_create_many_sequential(self):
alive.append(interp)

@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_create_many_threaded(self):
alive = []
def task():
Expand All @@ -32,6 +33,35 @@ def task():
with threading_helper.start_threads(threads):
pass

@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_many_threads_running_interp_in_other_interp(self):
interp = interpreters.create()

script = f"""if True:
import _interpreters
_interpreters.run_string({interp.id}, '1')
"""

def run():
interp = interpreters.create()
alreadyrunning = (f'{interpreters.InterpreterError}: '
'interpreter already running')
success = False
while not success:
try:
interp.exec(script)
except interpreters.ExecutionFailed as exc:
if exc.excinfo.msg != 'interpreter already running':
raise # re-raise
assert exc.excinfo.type.__name__ == 'InterpreterError'
else:
success = True

threads = (threading.Thread(target=run) for _ in range(200))
with threading_helper.start_threads(threads):
pass


if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
Expand Down
93 changes: 46 additions & 47 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ init_interpreter(PyInterpreterState *interp,
assert(next != NULL || (interp == runtime->interpreters.main));
interp->next = next;

interp->threads.preallocated = &interp->_initial_thread;

// We would call _PyObject_InitState() at this point
// if interp->feature_flags were alredy set.

Expand Down Expand Up @@ -766,7 +768,6 @@ PyInterpreterState_New(void)
return interp;
}


static void
interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
{
Expand Down Expand Up @@ -910,6 +911,9 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
// XXX Once we have one allocator per interpreter (i.e.
// per-interpreter GC) we must ensure that all of the interpreter's
// objects have been cleaned up at the point.

// We could clear interp->threads.freelist here
// if it held more than just the initial thread state.
}


Expand Down Expand Up @@ -1386,22 +1390,45 @@ allocate_chunk(int size_in_bytes, _PyStackChunk* previous)
return res;
}

static void
reset_threadstate(_PyThreadStateImpl *tstate)
{
// Set to _PyThreadState_INIT directly?
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
}

static _PyThreadStateImpl *
alloc_threadstate(void)
alloc_threadstate(PyInterpreterState *interp)
{
return PyMem_RawCalloc(1, sizeof(_PyThreadStateImpl));
_PyThreadStateImpl *tstate;

// Try the preallocated tstate first.
tstate = _Py_atomic_exchange_ptr(&interp->threads.preallocated, NULL);

// Fall back to the allocator.
if (tstate == NULL) {
tstate = PyMem_RawCalloc(1, sizeof(_PyThreadStateImpl));
if (tstate == NULL) {
return NULL;
}
reset_threadstate(tstate);
}
return tstate;
}

static void
free_threadstate(_PyThreadStateImpl *tstate)
{
PyInterpreterState *interp = tstate->base.interp;
// The initial thread state of the interpreter is allocated
// as part of the interpreter state so should not be freed.
if (tstate == &tstate->base.interp->_initial_thread) {
// Restore to _PyThreadState_INIT.
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
if (tstate == &interp->_initial_thread) {
// Make it available again.
reset_threadstate(tstate);
assert(interp->threads.preallocated == NULL);
_Py_atomic_store_ptr(&interp->threads.preallocated, tstate);
}
else {
PyMem_RawFree(tstate);
Expand Down Expand Up @@ -1492,66 +1519,38 @@ add_threadstate(PyInterpreterState *interp, PyThreadState *tstate,
static PyThreadState *
new_threadstate(PyInterpreterState *interp, int whence)
{
_PyThreadStateImpl *tstate;
_PyRuntimeState *runtime = interp->runtime;
// We don't need to allocate a thread state for the main interpreter
// (the common case), but doing it later for the other case revealed a
// reentrancy problem (deadlock). So for now we always allocate before
// taking the interpreters lock. See GH-96071.
_PyThreadStateImpl *new_tstate = alloc_threadstate();
int used_newtstate;
if (new_tstate == NULL) {
// Allocate the thread state.
_PyThreadStateImpl *tstate = alloc_threadstate(interp);
if (tstate == NULL) {
return NULL;
}

#ifdef Py_GIL_DISABLED
Py_ssize_t qsbr_idx = _Py_qsbr_reserve(interp);
if (qsbr_idx < 0) {
PyMem_RawFree(new_tstate);
free_threadstate(tstate);
return NULL;
}
int32_t tlbc_idx = _Py_ReserveTLBCIndex(interp);
if (tlbc_idx < 0) {
PyMem_RawFree(new_tstate);
free_threadstate(tstate);
return NULL;
}
#endif

/* We serialize concurrent creation to protect global state. */
HEAD_LOCK(runtime);
HEAD_LOCK(interp->runtime);

// Initialize the new thread state.
interp->threads.next_unique_id += 1;
uint64_t id = interp->threads.next_unique_id;
init_threadstate(tstate, interp, id, whence);

// Allocate the thread state and add it to the interpreter.
// Add the new thread state to the interpreter.
PyThreadState *old_head = interp->threads.head;
if (old_head == NULL) {
// It's the interpreter's initial thread state.
used_newtstate = 0;
tstate = &interp->_initial_thread;
}
// XXX Re-use interp->_initial_thread if not in use?
else {
// Every valid interpreter must have at least one thread.
assert(id > 1);
assert(old_head->prev == NULL);
used_newtstate = 1;
tstate = new_tstate;
// Set to _PyThreadState_INIT.
memcpy(tstate,
&initial._main_interpreter._initial_thread,
sizeof(*tstate));
}

init_threadstate(tstate, interp, id, whence);
add_threadstate(interp, (PyThreadState *)tstate, old_head);

HEAD_UNLOCK(runtime);
if (!used_newtstate) {
// Must be called with lock unlocked to avoid re-entrancy deadlock.
PyMem_RawFree(new_tstate);
}
else {
}
HEAD_UNLOCK(interp->runtime);

#ifdef Py_GIL_DISABLED
// Must be called with lock unlocked to avoid lock ordering deadlocks.
Expand Down
Loading