Skip to content

Commit 660f126

Browse files
gh-129898: per-thread current task implementation in asyncio (#129899)
Store the current running task on the thread state, it makes it thread safe for the free-threading build and while improving performance as there is no lock contention, this effectively makes it lock free. When accessing the current task of the current running loop in current thread, no locking is required and can be acessed without locking. In the rare case of accessing current task of a loop running in a different thread, the stop the world pauses is used in free-threading builds to stop all other running threads and find the task for the specified loop. This also makes it easier for external introspection to find the current task, and now it will be always correct.
1 parent fb17f41 commit 660f126

File tree

2 files changed

+72
-158
lines changed

2 files changed

+72
-158
lines changed

Lib/asyncio/tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1110,7 +1110,7 @@ def _unregister_eager_task(task):
11101110
from _asyncio import (_register_task, _register_eager_task,
11111111
_unregister_task, _unregister_eager_task,
11121112
_enter_task, _leave_task, _swap_current_task,
1113-
_scheduled_tasks, _eager_tasks, _current_tasks,
1113+
_scheduled_tasks, _eager_tasks,
11141114
current_task, all_tasks)
11151115
except ImportError:
11161116
pass

Modules/_asynciomodule.c

Lines changed: 71 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,6 @@ typedef struct {
139139
PyObject *asyncio_mod;
140140
PyObject *context_kwname;
141141

142-
/* Dictionary containing tasks that are currently active in
143-
all running event loops. {EventLoop: Task} */
144-
PyObject *current_tasks;
145-
146142
/* WeakSet containing scheduled 3rd party tasks which don't
147143
inherit from native asyncio.Task */
148144
PyObject *non_asyncio_tasks;
@@ -2061,8 +2057,6 @@ static int task_call_step_soon(asyncio_state *state, TaskObj *, PyObject *);
20612057
static PyObject * task_wakeup(TaskObj *, PyObject *);
20622058
static PyObject * task_step(asyncio_state *, TaskObj *, PyObject *);
20632059
static int task_eager_start(asyncio_state *state, TaskObj *task);
2064-
static inline void clear_ts_asyncio_running_task(PyObject *loop);
2065-
static inline void set_ts_asyncio_running_task(PyObject *loop, PyObject *task);
20662060

20672061
/* ----- Task._step wrapper */
20682062

@@ -2235,159 +2229,71 @@ unregister_eager_task(asyncio_state *state, PyObject *task)
22352229
}
22362230

22372231
static int
2238-
enter_task(asyncio_state *state, PyObject *loop, PyObject *task)
2232+
enter_task(PyObject *loop, PyObject *task)
22392233
{
2240-
PyObject *item;
2241-
int res = PyDict_SetDefaultRef(state->current_tasks, loop, task, &item);
2242-
if (res < 0) {
2234+
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
2235+
2236+
if (ts->asyncio_running_loop != loop) {
2237+
PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop);
22432238
return -1;
22442239
}
2245-
else if (res == 1) {
2240+
2241+
if (ts->asyncio_running_task != NULL) {
22462242
PyErr_Format(
22472243
PyExc_RuntimeError,
22482244
"Cannot enter into task %R while another " \
22492245
"task %R is being executed.",
2250-
task, item, NULL);
2251-
Py_DECREF(item);
2246+
task, ts->asyncio_running_task, NULL);
22522247
return -1;
22532248
}
22542249

2255-
assert(task == item);
2256-
Py_CLEAR(item);
2257-
set_ts_asyncio_running_task(loop, task);
2250+
ts->asyncio_running_task = Py_NewRef(task);
22582251
return 0;
22592252
}
22602253

22612254
static int
2262-
err_leave_task(PyObject *item, PyObject *task)
2255+
leave_task(PyObject *loop, PyObject *task)
22632256
{
2264-
PyErr_Format(
2265-
PyExc_RuntimeError,
2266-
"Leaving task %R does not match the current task %R.",
2267-
task, item);
2268-
return -1;
2269-
}
2270-
2271-
static int
2272-
leave_task_predicate(PyObject *item, void *task)
2273-
{
2274-
if (item != task) {
2275-
return err_leave_task(item, (PyObject *)task);
2276-
}
2277-
return 1;
2278-
}
2257+
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
22792258

2280-
static int
2281-
leave_task(asyncio_state *state, PyObject *loop, PyObject *task)
2282-
{
2283-
int res = _PyDict_DelItemIf(state->current_tasks, loop,
2284-
leave_task_predicate, task);
2285-
if (res == 0) {
2286-
// task was not found
2287-
return err_leave_task(Py_None, task);
2259+
if (ts->asyncio_running_loop != loop) {
2260+
PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop);
2261+
return -1;
22882262
}
2289-
clear_ts_asyncio_running_task(loop);
2290-
return res;
2291-
}
22922263

2293-
static PyObject *
2294-
swap_current_task_lock_held(PyDictObject *current_tasks, PyObject *loop,
2295-
Py_hash_t hash, PyObject *task)
2296-
{
2297-
PyObject *prev_task;
2298-
if (_PyDict_GetItemRef_KnownHash_LockHeld(current_tasks, loop, hash, &prev_task) < 0) {
2299-
return NULL;
2300-
}
2301-
if (_PyDict_SetItem_KnownHash_LockHeld(current_tasks, loop, task, hash) < 0) {
2302-
Py_XDECREF(prev_task);
2303-
return NULL;
2304-
}
2305-
if (prev_task == NULL) {
2306-
Py_RETURN_NONE;
2264+
if (ts->asyncio_running_task != task) {
2265+
PyErr_Format(
2266+
PyExc_RuntimeError,
2267+
"Invalid attempt to leave task %R while " \
2268+
"task %R is entered.",
2269+
task, ts->asyncio_running_task ? ts->asyncio_running_task : Py_None, NULL);
2270+
return -1;
23072271
}
2308-
return prev_task;
2272+
Py_CLEAR(ts->asyncio_running_task);
2273+
return 0;
23092274
}
23102275

23112276
static PyObject *
2312-
swap_current_task(asyncio_state *state, PyObject *loop, PyObject *task)
2277+
swap_current_task(PyObject *loop, PyObject *task)
23132278
{
2314-
PyObject *prev_task;
2315-
2316-
clear_ts_asyncio_running_task(loop);
2317-
if (task == Py_None) {
2318-
if (PyDict_Pop(state->current_tasks, loop, &prev_task) < 0) {
2319-
return NULL;
2320-
}
2321-
if (prev_task == NULL) {
2322-
Py_RETURN_NONE;
2323-
}
2324-
return prev_task;
2325-
}
2279+
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
23262280

2327-
Py_hash_t hash = PyObject_Hash(loop);
2328-
if (hash == -1) {
2281+
if (ts->asyncio_running_loop != loop) {
2282+
PyErr_Format(PyExc_RuntimeError, "loop %R is not the running loop", loop);
23292283
return NULL;
23302284
}
23312285

2332-
PyDictObject *current_tasks = (PyDictObject *)state->current_tasks;
2333-
Py_BEGIN_CRITICAL_SECTION(current_tasks);
2334-
prev_task = swap_current_task_lock_held(current_tasks, loop, hash, task);
2335-
Py_END_CRITICAL_SECTION();
2336-
set_ts_asyncio_running_task(loop, task);
2337-
return prev_task;
2338-
}
2339-
2340-
static inline void
2341-
set_ts_asyncio_running_task(PyObject *loop, PyObject *task)
2342-
{
2343-
// We want to enable debuggers and profilers to be able to quickly
2344-
// introspect the asyncio running state from another process.
2345-
// When we do that, we need to essentially traverse the address space
2346-
// of a Python process and understand what every Python thread in it is
2347-
// currently doing, mainly:
2348-
//
2349-
// * current frame
2350-
// * current asyncio task
2351-
//
2352-
// A naive solution would be to require profilers and debuggers to
2353-
// find the current task in the "_asynciomodule" module state, but
2354-
// unfortunately that would require a lot of complicated remote
2355-
// memory reads and logic, as Python's dict is a notoriously complex
2356-
// and ever-changing data structure.
2357-
//
2358-
// So the easier solution is to put a strong reference to the currently
2359-
// running `asyncio.Task` on the current thread state (the current loop
2360-
// is also stored there.)
2361-
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
2362-
if (ts->asyncio_running_loop == loop) {
2363-
// Protect from a situation when someone calls this method
2364-
// from another thread. This shouldn't ever happen though,
2365-
// as `enter_task` and `leave_task` can either be called by:
2366-
//
2367-
// - `asyncio.Task` itself, in `Task.__step()`. That method
2368-
// can only be called by the event loop itself.
2369-
//
2370-
// - third-party Task "from scratch" implementations, that
2371-
// our `capture_call_graph` API doesn't support anyway.
2372-
//
2373-
// That said, we still want to make sure we don't end up in
2374-
// a broken state, so we check that we're in the correct thread
2375-
// by comparing the *loop* argument to the event loop running
2376-
// in the current thread. If they match we know we're in the
2377-
// right thread, as asyncio event loops don't change threads.
2378-
assert(ts->asyncio_running_task == NULL);
2286+
/* transfer ownership to avoid redundant ref counting */
2287+
PyObject *prev_task = ts->asyncio_running_task;
2288+
if (task != Py_None) {
23792289
ts->asyncio_running_task = Py_NewRef(task);
2290+
} else {
2291+
ts->asyncio_running_task = NULL;
23802292
}
2381-
}
2382-
2383-
static inline void
2384-
clear_ts_asyncio_running_task(PyObject *loop)
2385-
{
2386-
// See comment in set_ts_asyncio_running_task() for details.
2387-
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
2388-
if (ts->asyncio_running_loop == NULL || ts->asyncio_running_loop == loop) {
2389-
Py_CLEAR(ts->asyncio_running_task);
2293+
if (prev_task == NULL) {
2294+
Py_RETURN_NONE;
23902295
}
2296+
return prev_task;
23912297
}
23922298

23932299
/* ----- Task */
@@ -3539,20 +3445,20 @@ task_step(asyncio_state *state, TaskObj *task, PyObject *exc)
35393445
{
35403446
PyObject *res;
35413447

3542-
if (enter_task(state, task->task_loop, (PyObject*)task) < 0) {
3448+
if (enter_task(task->task_loop, (PyObject*)task) < 0) {
35433449
return NULL;
35443450
}
35453451

35463452
res = task_step_impl(state, task, exc);
35473453

35483454
if (res == NULL) {
35493455
PyObject *exc = PyErr_GetRaisedException();
3550-
leave_task(state, task->task_loop, (PyObject*)task);
3456+
leave_task(task->task_loop, (PyObject*)task);
35513457
_PyErr_ChainExceptions1(exc);
35523458
return NULL;
35533459
}
35543460
else {
3555-
if (leave_task(state, task->task_loop, (PyObject*)task) < 0) {
3461+
if (leave_task(task->task_loop, (PyObject*)task) < 0) {
35563462
Py_DECREF(res);
35573463
return NULL;
35583464
}
@@ -3566,7 +3472,7 @@ static int
35663472
task_eager_start(asyncio_state *state, TaskObj *task)
35673473
{
35683474
assert(task != NULL);
3569-
PyObject *prevtask = swap_current_task(state, task->task_loop, (PyObject *)task);
3475+
PyObject *prevtask = swap_current_task(task->task_loop, (PyObject *)task);
35703476
if (prevtask == NULL) {
35713477
return -1;
35723478
}
@@ -3595,7 +3501,7 @@ task_eager_start(asyncio_state *state, TaskObj *task)
35953501
Py_DECREF(stepres);
35963502
}
35973503

3598-
PyObject *curtask = swap_current_task(state, task->task_loop, prevtask);
3504+
PyObject *curtask = swap_current_task(task->task_loop, prevtask);
35993505
Py_DECREF(prevtask);
36003506
if (curtask == NULL) {
36013507
retval = -1;
@@ -3907,8 +3813,7 @@ static PyObject *
39073813
_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task)
39083814
/*[clinic end generated code: output=a22611c858035b73 input=de1b06dca70d8737]*/
39093815
{
3910-
asyncio_state *state = get_asyncio_state(module);
3911-
if (enter_task(state, loop, task) < 0) {
3816+
if (enter_task(loop, task) < 0) {
39123817
return NULL;
39133818
}
39143819
Py_RETURN_NONE;
@@ -3932,8 +3837,7 @@ static PyObject *
39323837
_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task)
39333838
/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/
39343839
{
3935-
asyncio_state *state = get_asyncio_state(module);
3936-
if (leave_task(state, loop, task) < 0) {
3840+
if (leave_task(loop, task) < 0) {
39373841
return NULL;
39383842
}
39393843
Py_RETURN_NONE;
@@ -3957,7 +3861,7 @@ _asyncio__swap_current_task_impl(PyObject *module, PyObject *loop,
39573861
PyObject *task)
39583862
/*[clinic end generated code: output=9f88de958df74c7e input=c9c72208d3d38b6c]*/
39593863
{
3960-
return swap_current_task(get_asyncio_state(module), loop, task);
3864+
return swap_current_task(loop, task);
39613865
}
39623866

39633867

@@ -3974,9 +3878,6 @@ static PyObject *
39743878
_asyncio_current_task_impl(PyObject *module, PyObject *loop)
39753879
/*[clinic end generated code: output=fe15ac331a7f981a input=58910f61a5627112]*/
39763880
{
3977-
PyObject *ret;
3978-
asyncio_state *state = get_asyncio_state(module);
3979-
39803881
if (loop == Py_None) {
39813882
loop = _asyncio_get_running_loop_impl(module);
39823883
if (loop == NULL) {
@@ -3986,11 +3887,36 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
39863887
Py_INCREF(loop);
39873888
}
39883889

3989-
int rc = PyDict_GetItemRef(state->current_tasks, loop, &ret);
3990-
Py_DECREF(loop);
3991-
if (rc == 0) {
3890+
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET();
3891+
// Fast path for the current running loop of current thread
3892+
// no locking or stop the world pause is required
3893+
if (ts->asyncio_running_loop == loop) {
3894+
if (ts->asyncio_running_task != NULL) {
3895+
Py_DECREF(loop);
3896+
return Py_NewRef(ts->asyncio_running_task);
3897+
}
3898+
Py_DECREF(loop);
39923899
Py_RETURN_NONE;
39933900
}
3901+
3902+
PyObject *ret = Py_None;
3903+
// Stop the world and traverse the per-thread current tasks
3904+
// and return the task if the loop matches
3905+
PyInterpreterState *interp = ts->base.interp;
3906+
_PyEval_StopTheWorld(interp);
3907+
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
3908+
ts = (_PyThreadStateImpl *)p;
3909+
if (ts->asyncio_running_loop == loop) {
3910+
if (ts->asyncio_running_task != NULL) {
3911+
ret = Py_NewRef(ts->asyncio_running_task);
3912+
}
3913+
goto exit;
3914+
}
3915+
}
3916+
exit:
3917+
_Py_FOR_EACH_TSTATE_END(interp);
3918+
_PyEval_StartTheWorld(interp);
3919+
Py_DECREF(loop);
39943920
return ret;
39953921
}
39963922

@@ -4258,7 +4184,6 @@ module_traverse(PyObject *mod, visitproc visit, void *arg)
42584184

42594185
Py_VISIT(state->non_asyncio_tasks);
42604186
Py_VISIT(state->eager_tasks);
4261-
Py_VISIT(state->current_tasks);
42624187
Py_VISIT(state->iscoroutine_typecache);
42634188

42644189
Py_VISIT(state->context_kwname);
@@ -4289,7 +4214,6 @@ module_clear(PyObject *mod)
42894214

42904215
Py_CLEAR(state->non_asyncio_tasks);
42914216
Py_CLEAR(state->eager_tasks);
4292-
Py_CLEAR(state->current_tasks);
42934217
Py_CLEAR(state->iscoroutine_typecache);
42944218

42954219
Py_CLEAR(state->context_kwname);
@@ -4319,11 +4243,6 @@ module_init(asyncio_state *state)
43194243
goto fail;
43204244
}
43214245

4322-
state->current_tasks = PyDict_New();
4323-
if (state->current_tasks == NULL) {
4324-
goto fail;
4325-
}
4326-
43274246
state->iscoroutine_typecache = PySet_New(NULL);
43284247
if (state->iscoroutine_typecache == NULL) {
43294248
goto fail;
@@ -4456,11 +4375,6 @@ module_exec(PyObject *mod)
44564375
return -1;
44574376
}
44584377

4459-
if (PyModule_AddObjectRef(mod, "_current_tasks", state->current_tasks) < 0) {
4460-
return -1;
4461-
}
4462-
4463-
44644378
return 0;
44654379
}
44664380

0 commit comments

Comments
 (0)