@@ -67,6 +67,10 @@ typedef struct TaskObj {
6767 PyObject * task_name ;
6868 PyObject * task_context ;
6969 struct llist_node task_node ;
70+ #ifdef Py_GIL_DISABLED
71+ // thread id of the thread where this task was created
72+ uintptr_t task_tid ;
73+ #endif
7074} TaskObj ;
7175
7276typedef struct {
@@ -94,14 +98,6 @@ typedef struct {
9498 || PyObject_TypeCheck(obj, state->FutureType) \
9599 || PyObject_TypeCheck(obj, state->TaskType))
96100
97- #ifdef Py_GIL_DISABLED
98- # define ASYNCIO_STATE_LOCK (state ) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
99- # define ASYNCIO_STATE_UNLOCK (state ) Py_END_CRITICAL_SECTION()
100- #else
101- # define ASYNCIO_STATE_LOCK (state ) ((void)state)
102- # define ASYNCIO_STATE_UNLOCK (state ) ((void)state)
103- #endif
104-
105101typedef struct _Py_AsyncioModuleDebugOffsets {
106102 struct _asyncio_task_object {
107103 uint64_t size ;
@@ -135,9 +131,6 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug)
135131
136132/* State of the _asyncio module */
137133typedef struct {
138- #ifdef Py_GIL_DISABLED
139- PyMutex mutex ;
140- #endif
141134 PyTypeObject * FutureIterType ;
142135 PyTypeObject * TaskStepMethWrapper_Type ;
143136 PyTypeObject * FutureType ;
@@ -184,11 +177,6 @@ typedef struct {
184177 /* Counter for autogenerated Task names */
185178 uint64_t task_name_counter ;
186179
187- /* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
188- or subclasses of it. Third party tasks implementations which don't inherit from
189- `asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
190- */
191- struct llist_node asyncio_tasks_head ;
192180} asyncio_state ;
193181
194182static inline asyncio_state *
@@ -2179,16 +2167,15 @@ static PyMethodDef TaskWakeupDef = {
21792167static void
21802168register_task (asyncio_state * state , TaskObj * task )
21812169{
2182- ASYNCIO_STATE_LOCK (state );
21832170 assert (Task_Check (state , task ));
21842171 if (task -> task_node .next != NULL ) {
21852172 // already registered
21862173 assert (task -> task_node .prev != NULL );
2187- goto exit ;
2174+ return ;
21882175 }
2189- llist_insert_tail ( & state -> asyncio_tasks_head , & task -> task_node );
2190- exit :
2191- ASYNCIO_STATE_UNLOCK ( state );
2176+ _PyThreadStateImpl * tstate = ( _PyThreadStateImpl * ) _PyThreadState_GET ( );
2177+ struct llist_node * head = & tstate -> asyncio_tasks_head ;
2178+ llist_insert_tail ( head , & task -> task_node );
21922179}
21932180
21942181static int
@@ -2197,19 +2184,38 @@ register_eager_task(asyncio_state *state, PyObject *task)
21972184 return PySet_Add (state -> eager_tasks , task );
21982185}
21992186
2200- static void
2201- unregister_task ( asyncio_state * state , TaskObj * task )
2187+ static inline void
2188+ unregister_task_safe ( TaskObj * task )
22022189{
2203- ASYNCIO_STATE_LOCK (state );
2204- assert (Task_Check (state , task ));
22052190 if (task -> task_node .next == NULL ) {
22062191 // not registered
22072192 assert (task -> task_node .prev == NULL );
2208- goto exit ;
2193+ return ;
22092194 }
22102195 llist_remove (& task -> task_node );
2211- exit :
2212- ASYNCIO_STATE_UNLOCK (state );
2196+ }
2197+
2198+ static void
2199+ unregister_task (asyncio_state * state , TaskObj * task )
2200+ {
2201+ assert (Task_Check (state , task ));
2202+ #ifdef Py_GIL_DISABLED
2203+ // check if we are in the same thread
2204+ // if so, we can avoid locking
2205+ if (task -> task_tid == _Py_ThreadId ()) {
2206+ unregister_task_safe (task );
2207+ }
2208+ else {
2209+ // we are in a different thread
2210+ // stop the world then check and remove the task
2211+ PyThreadState * tstate = _PyThreadState_GET ();
2212+ _PyEval_StopTheWorld (tstate -> interp );
2213+ unregister_task_safe (task );
2214+ _PyEval_StartTheWorld (tstate -> interp );
2215+ }
2216+ #else
2217+ unregister_task_safe (task );
2218+ #endif
22132219}
22142220
22152221static int
@@ -2423,6 +2429,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
24232429 }
24242430
24252431 Py_CLEAR (self -> task_fut_waiter );
2432+ #ifdef Py_GIL_DISABLED
2433+ self -> task_tid = _Py_ThreadId ();
2434+ #endif
24262435 self -> task_must_cancel = 0 ;
24272436 self -> task_log_destroy_pending = 1 ;
24282437 self -> task_num_cancels_requested = 0 ;
@@ -3981,6 +3990,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
39813990static inline int
39823991add_one_task (asyncio_state * state , PyObject * tasks , PyObject * task , PyObject * loop )
39833992{
3993+ assert (PySet_CheckExact (tasks ));
39843994 PyObject * done = PyObject_CallMethodNoArgs (task , & _Py_ID (done ));
39853995 if (done == NULL ) {
39863996 return -1 ;
@@ -4003,6 +4013,57 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
40034013 return 0 ;
40044014}
40054015
4016+ static inline int
4017+ add_tasks_llist (struct llist_node * head , PyListObject * tasks )
4018+ {
4019+ struct llist_node * node ;
4020+ llist_for_each_safe (node , head ) {
4021+ TaskObj * task = llist_data (node , TaskObj , task_node );
4022+ // The linked list holds borrowed references to task
4023+ // as such it is possible that the task is concurrently
4024+ // deallocated while added to this list.
4025+ // To protect against concurrent deallocations,
4026+ // we first try to incref the task which would fail
4027+ // if it is concurrently getting deallocated in another thread,
4028+ // otherwise it gets added to the list.
4029+ if (_Py_TryIncref ((PyObject * )task )) {
4030+ if (_PyList_AppendTakeRef (tasks , (PyObject * )task ) < 0 ) {
4031+ // do not call any escaping calls here while the world is stopped.
4032+ return -1 ;
4033+ }
4034+ }
4035+ }
4036+ return 0 ;
4037+ }
4038+
4039+ static inline int
4040+ add_tasks_interp (PyInterpreterState * interp , PyListObject * tasks )
4041+ {
4042+ #ifdef Py_GIL_DISABLED
4043+ assert (interp -> stoptheworld .world_stopped );
4044+ #endif
4045+ // Start traversing from interpreter's linked list
4046+ struct llist_node * head = & interp -> asyncio_tasks_head ;
4047+
4048+ if (add_tasks_llist (head , tasks ) < 0 ) {
4049+ return -1 ;
4050+ }
4051+
4052+ int ret = 0 ;
4053+ // traverse the task lists of thread states
4054+ _Py_FOR_EACH_TSTATE_BEGIN (interp , p ) {
4055+ _PyThreadStateImpl * ts = (_PyThreadStateImpl * )p ;
4056+ head = & ts -> asyncio_tasks_head ;
4057+ if (add_tasks_llist (head , tasks ) < 0 ) {
4058+ ret = -1 ;
4059+ goto exit ;
4060+ }
4061+ }
4062+ exit :
4063+ _Py_FOR_EACH_TSTATE_END (interp );
4064+ return ret ;
4065+ }
4066+
40064067/*********************** Module **************************/
40074068
40084069/*[clinic input]
@@ -4041,30 +4102,29 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
40414102 Py_DECREF (loop );
40424103 return NULL ;
40434104 }
4044- int err = 0 ;
4045- ASYNCIO_STATE_LOCK (state );
4046- struct llist_node * node ;
4047-
4048- llist_for_each_safe (node , & state -> asyncio_tasks_head ) {
4049- TaskObj * task = llist_data (node , TaskObj , task_node );
4050- // The linked list holds borrowed references to task
4051- // as such it is possible that the task is concurrently
4052- // deallocated while added to this list.
4053- // To protect against concurrent deallocations,
4054- // we first try to incref the task which would fail
4055- // if it is concurrently getting deallocated in another thread,
4056- // otherwise it gets added to the list.
4057- if (_Py_TryIncref ((PyObject * )task )) {
4058- if (_PyList_AppendTakeRef ((PyListObject * )tasks , (PyObject * )task ) < 0 ) {
4059- Py_DECREF (tasks );
4060- Py_DECREF (loop );
4061- err = 1 ;
4062- break ;
4063- }
4064- }
4065- }
4066- ASYNCIO_STATE_UNLOCK (state );
4067- if (err ) {
4105+ PyInterpreterState * interp = PyInterpreterState_Get ();
4106+ // Stop the world and traverse the per-thread linked list
4107+ // of asyncio tasks for every thread, as well as the
4108+ // interpreter's linked list, and add them to `tasks`.
4109+ // The interpreter linked list is used for any lingering tasks
4110+ // whose thread state has been deallocated while the task was
4111+ // still alive. This can happen if a task is referenced by
4112+ // a different thread, in which case the task is moved to
4113+ // the interpreter's linked list from the thread's linked
4114+ // list before deallocation. See PyThreadState_Clear.
4115+ //
4116+ // The stop-the-world pause is required so that no thread
4117+ // modifies its linked list while being iterated here
4118+ // in parallel. This design allows for lock-free
4119+ // register_task/unregister_task for loops running in parallel
4120+ // in different threads (the general case).
4121+ _PyEval_StopTheWorld (interp );
4122+ int ret = add_tasks_interp (interp , (PyListObject * )tasks );
4123+ _PyEval_StartTheWorld (interp );
4124+ if (ret < 0 ) {
4125+ // call any escaping calls after starting the world to avoid any deadlocks.
4126+ Py_DECREF (tasks );
4127+ Py_DECREF (loop );
40684128 return NULL ;
40694129 }
40704130 PyObject * scheduled_iter = PyObject_GetIter (state -> non_asyncio_tasks );
@@ -4348,7 +4408,6 @@ module_exec(PyObject *mod)
43484408{
43494409 asyncio_state * state = get_asyncio_state (mod );
43504410
4351- llist_init (& state -> asyncio_tasks_head );
43524411
43534412#define CREATE_TYPE (m , tp , spec , base ) \
43544413 do { \
0 commit comments