Skip to content

Commit 237a089

Browse files
add a per interp tasks list
1 parent acef821 commit 237a089

File tree

3 files changed

+73
-30
lines changed

3 files changed

+73
-30
lines changed

Include/internal/pycore_interp.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ struct _is {
228228
_PyIndexPool tlbc_indices;
229229
#endif
230230

231+
struct llist_node asyncio_tasks_head;
231232
// Per-interpreter state for the obmalloc allocator. For the main
232233
// interpreter and for all interpreters that don't have their
233234
// own obmalloc state, this points to the static structure in
@@ -280,6 +281,7 @@ struct _is {
280281
struct _Py_interp_cached_objects cached_objects;
281282
struct _Py_interp_static_objects static_objects;
282283

284+
283285
Py_ssize_t _interactive_src_count;
284286

285287
/* the initial PyInterpreterState.threads.head */

Modules/_asynciomodule.c

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ typedef struct TaskObj {
6161
PyObject *task_name;
6262
PyObject *task_context;
6363
struct llist_node task_node;
64+
#ifdef Py_GIL_DISABLED
65+
uintptr_t task_tid;
66+
#endif
6467
} TaskObj;
6568

6669
typedef struct {
@@ -2002,11 +2005,31 @@ static void
20022005
unregister_task(asyncio_state *state, TaskObj *task)
20032006
{
20042007
assert(Task_Check(state, task));
2005-
if (task->task_node.next == NULL) {
2006-
// not registered
2007-
assert(task->task_node.prev == NULL);
2008-
return;
2008+
#ifdef Py_GIL_DISABLED
2009+
// check if we are in the same thread
2010+
// if so, we can avoid locking
2011+
if (task->task_tid == _Py_ThreadId()) {
2012+
if (task->task_node.next == NULL) {
2013+
// not registered
2014+
assert(task->task_node.prev == NULL);
2015+
return;
2016+
}
2017+
llist_remove(&task->task_node);
2018+
}
2019+
else {
2020+
// we are in a different thread
2021+
// stop the world then check and remove the task
2022+
PyThreadState *tstate = _PyThreadState_GET();
2023+
_PyEval_StopTheWorld(tstate->interp);
2024+
if (task->task_node.next == NULL) {
2025+
// not registered
2026+
assert(task->task_node.prev == NULL);
2027+
}
2028+
else {
2029+
llist_remove(&task->task_node);
2030+
}
20092031
}
2032+
#endif
20102033
llist_remove(&task->task_node);
20112034
}
20122035

@@ -2162,6 +2185,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
21622185
}
21632186

21642187
Py_CLEAR(self->task_fut_waiter);
2188+
#ifdef Py_GIL_DISABLED
2189+
self->task_tid = _Py_ThreadId();
2190+
#endif
21652191
self->task_must_cancel = 0;
21662192
self->task_log_destroy_pending = 1;
21672193
self->task_num_cancels_requested = 0;
@@ -3708,6 +3734,32 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
37083734
return 0;
37093735
}
37103736

3737+
static inline int
3738+
add_tasks_from_head(struct llist_node *head, PyListObject *tasks)
3739+
{
3740+
struct llist_node *node;
3741+
llist_for_each_safe(node, head) {
3742+
TaskObj *task = llist_data(node, TaskObj, task_node);
3743+
// The linked list holds borrowed references to task
3744+
// as such it is possible that the task is concurrently
3745+
// deallocated while added to this list.
3746+
// To protect against concurrent deallocations,
3747+
// we first try to incref the task which would fail
3748+
// if it is concurrently getting deallocated in another thread,
3749+
// otherwise it gets added to the list.
3750+
if (_Py_TryIncref((PyObject *)task)) {
3751+
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
3752+
// do not call any escaping function such as Py_DECREF
3753+
// while holding the runtime lock, instead set err=1 and
3754+
// call them after releasing the runtime lock
3755+
// and starting the world to avoid any deadlocks.
3756+
return -1;
3757+
}
3758+
}
3759+
}
3760+
return 0;
3761+
}
3762+
37113763
/*********************** Module **************************/
37123764

37133765
/*[clinic input]
@@ -3756,31 +3808,18 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
37563808
// This design allows for lock free register/unregister of tasks
37573809
// of loops running concurrently in different threads.
37583810
_PyEval_StopTheWorld(interp);
3759-
struct llist_node *node;
37603811
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
37613812
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p;
37623813
struct llist_node *head = &tstate->asyncio_tasks_head;
3763-
llist_for_each_safe(node, head) {
3764-
TaskObj *task = llist_data(node, TaskObj, task_node);
3765-
// The linked list holds borrowed references to task
3766-
// as such it is possible that the task is concurrently
3767-
// deallocated while added to this list.
3768-
// To protect against concurrent deallocations,
3769-
// we first try to incref the task which would fail
3770-
// if it is concurrently getting deallocated in another thread,
3771-
// otherwise it gets added to the list.
3772-
if (_Py_TryIncref((PyObject *)task)) {
3773-
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
3774-
// do not call any escaping function such as Py_DECREF
3775-
// while holding the runtime lock, instead set err=1 and
3776-
// call them after releasing the runtime lock
3777-
// and starting the world to avoid any deadlocks.
3778-
err = 1;
3779-
break;
3780-
}
3781-
}
3814+
if (add_tasks_from_head(head, (PyListObject *)tasks) < 0) {
3815+
err = 1;
3816+
break;
37823817
}
37833818
}
3819+
// traverse the linked list of the interpreter
3820+
if (err == 0 && add_tasks_from_head(&interp->asyncio_tasks_head, (PyListObject *)tasks) < 0) {
3821+
err = 1;
3822+
}
37843823
_Py_FOR_EACH_TSTATE_END(interp);
37853824
_PyEval_StartTheWorld(interp);
37863825
if (err) {

Python/pystate.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ init_interpreter(PyInterpreterState *interp,
643643
_Py_brc_init_state(interp);
644644
#endif
645645
llist_init(&interp->mem_free_queue.head);
646+
llist_init(&interp->asyncio_tasks_head);
646647
for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
647648
interp->monitors.tools[i] = 0;
648649
}
@@ -1698,12 +1699,13 @@ PyThreadState_Clear(PyThreadState *tstate)
16981699

16991700
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);
17001701

1701-
struct llist_node *node;
1702-
// Clear any lingering tasks so that `TaskObj_finalize` doesn't
1703-
// try to unregister task from a freed list.
1704-
llist_for_each_safe(node, &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head) {
1705-
llist_remove(node);
1706-
}
1702+
1703+
_PyEval_StopTheWorld(tstate->interp);
1704+
// merge any lingering tasks from thread state to interpreter's
1705+
// tasks list
1706+
llist_concat(&tstate->interp->asyncio_tasks_head,
1707+
&((_PyThreadStateImpl *)tstate)->asyncio_tasks_head);
1708+
_PyEval_StartTheWorld(tstate->interp);
17071709

17081710
Py_CLEAR(tstate->dict);
17091711
Py_CLEAR(tstate->async_exc);

0 commit comments

Comments
 (0)