Skip to content

Commit d0fbbc2

Browse files
add interp llist
1 parent f863cd5 commit d0fbbc2

File tree

2 files changed

+50
-43
lines changed

2 files changed

+50
-43
lines changed

Include/internal/pycore_interp.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,9 @@ struct _is {
227227
PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS];
228228
_PyIndexPool tlbc_indices;
229229
#endif
230-
230+
// Per-interpreter list of tasks, any lingering tasks from thread
231+
// states gets added here and removed from the corresponding
232+
// thread state's list.
231233
struct llist_node asyncio_tasks_head;
232234
// Per-interpreter state for the obmalloc allocator. For the main
233235
// interpreter and for all interpreters that don't have their
@@ -281,7 +283,6 @@ struct _is {
281283
struct _Py_interp_cached_objects cached_objects;
282284
struct _Py_interp_static_objects static_objects;
283285

284-
285286
Py_ssize_t _interactive_src_count;
286287

287288
/* the initial PyInterpreterState.threads.head */

Modules/_asynciomodule.c

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ typedef struct TaskObj {
6262
PyObject *task_context;
6363
struct llist_node task_node;
6464
#ifdef Py_GIL_DISABLED
65+
// thread id of the thread where this task was created
6566
uintptr_t task_tid;
6667
#endif
6768
} TaskObj;
@@ -2001,6 +2002,17 @@ register_eager_task(asyncio_state *state, PyObject *task)
20012002
return PySet_Add(state->eager_tasks, task);
20022003
}
20032004

2005+
static inline void
2006+
unregister_task_safe(TaskObj *task)
2007+
{
2008+
if (task->task_node.next == NULL) {
2009+
// not registered
2010+
assert(task->task_node.prev == NULL);
2011+
return;
2012+
}
2013+
llist_remove(&task->task_node);
2014+
}
2015+
20042016
static void
20052017
unregister_task(asyncio_state *state, TaskObj *task)
20062018
{
@@ -2009,34 +2021,18 @@ unregister_task(asyncio_state *state, TaskObj *task)
20092021
// check if we are in the same thread
20102022
// if so, we can avoid locking
20112023
if (task->task_tid == _Py_ThreadId()) {
2012-
if (task->task_node.next == NULL) {
2013-
// not registered
2014-
assert(task->task_node.prev == NULL);
2015-
return;
2016-
}
2017-
llist_remove(&task->task_node);
2024+
unregister_task_safe(task);
20182025
}
20192026
else {
20202027
// we are in a different thread
20212028
// stop the world then check and remove the task
20222029
PyThreadState *tstate = _PyThreadState_GET();
20232030
_PyEval_StopTheWorld(tstate->interp);
2024-
if (task->task_node.next == NULL) {
2025-
// not registered
2026-
assert(task->task_node.prev == NULL);
2027-
}
2028-
else {
2029-
llist_remove(&task->task_node);
2030-
}
2031+
unregister_task_safe(task);
20312032
_PyEval_StartTheWorld(tstate->interp);
20322033
}
20332034
#else
2034-
if (task->task_node.next == NULL) {
2035-
// not registered
2036-
assert(task->task_node.prev == NULL);
2037-
return;
2038-
}
2039-
llist_remove(&task->task_node);
2035+
unregister_task_safe(task);
20402036
#endif
20412037
}
20422038

@@ -3719,6 +3715,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
37193715
static inline int
37203716
add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
37213717
{
3718+
assert(PySet_CheckExact(tasks));
37223719
PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
37233720
if (done == NULL) {
37243721
return -1;
@@ -3742,8 +3739,16 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
37423739
}
37433740

37443741
static inline int
3745-
add_tasks_from_head(struct llist_node *head, PyListObject *tasks)
3742+
add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks)
37463743
{
3744+
#ifdef Py_GIL_DISABLED
3745+
assert(interp->stoptheworld.world_stopped);
3746+
#endif
3747+
// Start traversing from interpreter's linked list
3748+
struct llist_node *head = &interp->asyncio_tasks_head;
3749+
_PyThreadStateImpl *thead = (_PyThreadStateImpl *)interp->threads.head;
3750+
3751+
traverse:
37473752
struct llist_node *node;
37483753
llist_for_each_safe(node, head) {
37493754
TaskObj *task = llist_data(node, TaskObj, task_node);
@@ -3756,14 +3761,17 @@ add_tasks_from_head(struct llist_node *head, PyListObject *tasks)
37563761
// otherwise it gets added to the list.
37573762
if (_Py_TryIncref((PyObject *)task)) {
37583763
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
3759-
// do not call any escaping function such as Py_DECREF
3760-
// while holding the runtime lock, instead set err=1 and
3761-
// call them after releasing the runtime lock
3762-
// and starting the world to avoid any deadlocks.
3764+
// do not call any escaping calls here while holding the runtime lock.
37633765
return -1;
37643766
}
37653767
}
37663768
}
3769+
// traverse the linked lists of thread states
3770+
if (thead != NULL) {
3771+
head = &thead->asyncio_tasks_head;
3772+
thead = (_PyThreadStateImpl *)thead->base.next;
3773+
goto traverse;
3774+
}
37673775
return 0;
37683776
}
37693777

@@ -3805,31 +3813,29 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
38053813
Py_DECREF(loop);
38063814
return NULL;
38073815
}
3808-
int err = 0;
38093816
PyInterpreterState *interp = PyInterpreterState_Get();
38103817
// Stop the world and traverse the per-thread linked list
3811-
// of asyncio tasks of all threads and add them to the list.
3818+
// of asyncio tasks of all threads and the interpreter's
3819+
// linked list and them to tasks list.
3820+
// The interpreter linked list is used for any lingering tasks
3821+
// whose thread state has been deallocated but the task is
3822+
// still alive. This can happen if task is referenced by a
3823+
// different thread, in which case the task is moved to the
3824+
// interpreter's linked list from the thread's linked list
3825+
// before deallocation.
38123826
// Stop the world pause is required so that no thread
38133827
// modifies it's linked list while being iterated here
38143828
// concurrently.
38153829
// This design allows for lock free register/unregister of tasks
3816-
// of loops running concurrently in different threads.
3830+
// of loops running concurrently in different threads (general case).
38173831
_PyEval_StopTheWorld(interp);
3818-
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
3819-
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p;
3820-
struct llist_node *head = &tstate->asyncio_tasks_head;
3821-
if (add_tasks_from_head(head, (PyListObject *)tasks) < 0) {
3822-
err = 1;
3823-
break;
3824-
}
3825-
}
3826-
// traverse the linked list of the interpreter
3827-
if (err == 0 && add_tasks_from_head(&interp->asyncio_tasks_head, (PyListObject *)tasks) < 0) {
3828-
err = 1;
3829-
}
3830-
_Py_FOR_EACH_TSTATE_END(interp);
3832+
HEAD_LOCK(interp->runtime);
3833+
int ret = add_tasks_interp(interp, (PyListObject *)tasks);
3834+
HEAD_UNLOCK(interp->runtime);
38313835
_PyEval_StartTheWorld(interp);
3832-
if (err) {
3836+
if (ret < 0) {
3837+
// call any escaping calls after releasing the runtime lock
3838+
// and starting the world to avoid any deadlocks.
38333839
Py_DECREF(tasks);
38343840
Py_DECREF(loop);
38353841
return NULL;

0 commit comments

Comments
 (0)