From eac05aa37f1c6e37139fb36dd8f972fe2cc1e2d0 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Thu, 2 Jan 2025 13:09:46 +0000 Subject: [PATCH 1/2] switch to per loop current task --- .../pycore_global_objects_fini_generated.h | 1 + Include/internal/pycore_global_strings.h | 1 + .../internal/pycore_runtime_init_generated.h | 1 + .../internal/pycore_unicodeobject_generated.h | 4 + Lib/asyncio/tasks.py | 49 +++---- Modules/_asynciomodule.c | 122 +++++++----------- 6 files changed, 78 insertions(+), 100 deletions(-) diff --git a/Include/internal/pycore_global_objects_fini_generated.h b/Include/internal/pycore_global_objects_fini_generated.h index 90214a314031d1..cf249f2d2f188f 100644 --- a/Include/internal/pycore_global_objects_fini_generated.h +++ b/Include/internal/pycore_global_objects_fini_generated.h @@ -745,6 +745,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) { _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_blksize)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_bootstrap)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_check_retval_)); + _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_current_task)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_dealloc_warn)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_feature_version)); _PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(_field_types)); diff --git a/Include/internal/pycore_global_strings.h b/Include/internal/pycore_global_strings.h index 97a75d0c46c867..e20741d2f2cc91 100644 --- a/Include/internal/pycore_global_strings.h +++ b/Include/internal/pycore_global_strings.h @@ -234,6 +234,7 @@ struct _Py_global_strings { STRUCT_FOR_ID(_blksize) STRUCT_FOR_ID(_bootstrap) STRUCT_FOR_ID(_check_retval_) + STRUCT_FOR_ID(_current_task) STRUCT_FOR_ID(_dealloc_warn) STRUCT_FOR_ID(_feature_version) STRUCT_FOR_ID(_field_types) diff --git a/Include/internal/pycore_runtime_init_generated.h b/Include/internal/pycore_runtime_init_generated.h index 4f928cc050bf8e..d0edc91a074355 100644 --- a/Include/internal/pycore_runtime_init_generated.h +++ b/Include/internal/pycore_runtime_init_generated.h @@ -743,6 +743,7 @@ extern "C" { INIT_ID(_blksize), \ INIT_ID(_bootstrap), \ INIT_ID(_check_retval_), \ + INIT_ID(_current_task), \ INIT_ID(_dealloc_warn), \ INIT_ID(_feature_version), \ INIT_ID(_field_types), \ diff --git a/Include/internal/pycore_unicodeobject_generated.h b/Include/internal/pycore_unicodeobject_generated.h index 5b78d038fc1192..e3c969ea70e764 100644 --- a/Include/internal/pycore_unicodeobject_generated.h +++ b/Include/internal/pycore_unicodeobject_generated.h @@ -732,6 +732,10 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) { _PyUnicode_InternStatic(interp, &string); assert(_PyUnicode_CheckConsistency(string, 1)); assert(PyUnicode_GET_LENGTH(string) != 1); + string = &_Py_ID(_current_task); + _PyUnicode_InternStatic(interp, &string); + assert(_PyUnicode_CheckConsistency(string, 1)); + assert(PyUnicode_GET_LENGTH(string) != 1); string = &_Py_ID(_dealloc_warn); _PyUnicode_InternStatic(interp, &string); assert(_PyUnicode_CheckConsistency(string, 1)); diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 2112dd4b99d17f..197478f4b7d92f 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -38,7 +38,11 @@ def current_task(loop=None): """Return a currently executed task.""" if loop is None: loop = events.get_running_loop() - return _current_tasks.get(loop) + + try: + return loop._current_task + except AttributeError: + return None def all_tasks(loop=None): @@ -1024,10 +1028,6 @@ def factory(loop, coro, *, name=None, context=None): _scheduled_tasks = weakref.WeakSet() _eager_tasks = set() -# Dictionary containing tasks that are currently active in -# all running event loops. {EventLoop: Task} -_current_tasks = {} - def _register_task(task): """Register an asyncio Task scheduled to run on an event loop.""" @@ -1040,29 +1040,32 @@ def _register_eager_task(task): def _enter_task(loop, task): - current_task = _current_tasks.get(loop) - if current_task is not None: - raise RuntimeError(f"Cannot enter into task {task!r} while another " - f"task {current_task!r} is being executed.") - _current_tasks[loop] = task + try: + if loop._current_task is not None: + raise RuntimeError(f"Cannot enter into task {task!r} while another " + f"task {loop._current_task!r} is being executed.") + except AttributeError: + pass + loop._current_task = task def _leave_task(loop, task): - current_task = _current_tasks.get(loop) - if current_task is not task: - raise RuntimeError(f"Leaving task {task!r} does not match " - f"the current task {current_task!r}.") - del _current_tasks[loop] + try: + if loop._current_task is not task: + raise RuntimeError(f"Leaving task {task!r} does not match " + f"the current task {loop._current_task!r}.") + except AttributeError: + pass + loop._current_task = None def _swap_current_task(loop, task): - prev_task = _current_tasks.get(loop) - if task is None: - del _current_tasks[loop] - else: - _current_tasks[loop] = task - return prev_task - + try: + prev_task = loop._current_task + loop._current_task = task + return prev_task + except AttributeError: + loop._current_task = task def _unregister_task(task): """Unregister a completed, scheduled Task.""" @@ -1088,7 +1091,7 @@ def _unregister_eager_task(task): from _asyncio import (_register_task, _register_eager_task, _unregister_task, _unregister_eager_task, _enter_task, _leave_task, _swap_current_task, - _scheduled_tasks, _eager_tasks, _current_tasks, + _scheduled_tasks, _eager_tasks, current_task, all_tasks) except ImportError: pass diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 74db4c74af905a..87f08e2561934b 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -97,10 +97,6 @@ typedef struct { PyObject *asyncio_mod; PyObject *context_kwname; - /* Dictionary containing tasks that are currently active in - all running event loops. {EventLoop: Task} */ - PyObject *current_tasks; - /* WeakSet containing scheduled 3rd party tasks which don't inherit from native asyncio.Task */ PyObject *non_asyncio_tasks; @@ -1926,11 +1922,10 @@ static int enter_task(asyncio_state *state, PyObject *loop, PyObject *task) { PyObject *item; - int res = PyDict_SetDefaultRef(state->current_tasks, loop, task, &item); - if (res < 0) { + if (PyObject_GetOptionalAttr(loop, &_Py_ID(_current_task), &item) < 0) { return -1; } - else if (res == 1) { + if (item != NULL && item != Py_None) { PyErr_Format( PyExc_RuntimeError, "Cannot enter into task %R while another " \ @@ -1939,84 +1934,63 @@ enter_task(asyncio_state *state, PyObject *loop, PyObject *task) Py_DECREF(item); return -1; } - Py_DECREF(item); - return 0; -} - -static int -err_leave_task(PyObject *item, PyObject *task) -{ - PyErr_Format( - PyExc_RuntimeError, - "Leaving task %R does not match the current task %R.", - task, item); - return -1; -} -static int -leave_task_predicate(PyObject *item, void *task) -{ - if (item != task) { - return err_leave_task(item, (PyObject *)task); + if (PyObject_SetAttr(loop, &_Py_ID(_current_task), task) < 0) { + return -1; } - return 1; + return 0; } static int leave_task(asyncio_state *state, PyObject *loop, PyObject *task) -/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ { - int res = _PyDict_DelItemIf(state->current_tasks, loop, - leave_task_predicate, task); - if (res == 0) { - // task was not found - return err_leave_task(Py_None, task); + PyObject *item; + if (PyObject_GetOptionalAttr(loop, &_Py_ID(_current_task), &item) < 0) { + return -1; } - return res; -} -static PyObject * -swap_current_task_lock_held(PyDictObject *current_tasks, PyObject *loop, - Py_hash_t hash, PyObject *task) -{ - PyObject *prev_task; - if (_PyDict_GetItemRef_KnownHash_LockHeld(current_tasks, loop, hash, &prev_task) < 0) { - return NULL; + if (item == NULL || item == Py_None) { + // current task is not set + PyErr_Format(PyExc_RuntimeError, + "Leaving task %R does not match the current task %R.", + task, Py_None); + return -1; } - if (_PyDict_SetItem_KnownHash_LockHeld(current_tasks, loop, task, hash) < 0) { - Py_XDECREF(prev_task); - return NULL; + + if (item != task) { + // different task + PyErr_Format(PyExc_RuntimeError, + "Leaving task %R does not match the current task %R.", + task, item); + Py_DECREF(item); + return -1; } - if (prev_task == NULL) { - Py_RETURN_NONE; + Py_DECREF(item); + + if (PyObject_SetAttr(loop, &_Py_ID(_current_task), Py_None) < 0) { + return -1; } - return prev_task; + return 0; } + static PyObject * swap_current_task(asyncio_state *state, PyObject *loop, PyObject *task) { PyObject *prev_task; - if (task == Py_None) { - if (PyDict_Pop(state->current_tasks, loop, &prev_task) < 0) { - return NULL; - } - if (prev_task == NULL) { - Py_RETURN_NONE; - } - return prev_task; + if (PyObject_GetOptionalAttr(loop, &_Py_ID(_current_task), &prev_task) < 0) { + return NULL; } - Py_hash_t hash = PyObject_Hash(loop); - if (hash == -1) { + if (PyObject_SetAttr(loop, &_Py_ID(_current_task), task) < 0) { + Py_XDECREF(prev_task); return NULL; } - PyDictObject *current_tasks = (PyDictObject *)state->current_tasks; - Py_BEGIN_CRITICAL_SECTION(current_tasks); - prev_task = swap_current_task_lock_held(current_tasks, loop, hash, task); - Py_END_CRITICAL_SECTION(); + if (prev_task == NULL) { + Py_RETURN_NONE; + } return prev_task; } @@ -3503,9 +3477,6 @@ static PyObject * _asyncio_current_task_impl(PyObject *module, PyObject *loop) /*[clinic end generated code: output=fe15ac331a7f981a input=58910f61a5627112]*/ { - PyObject *ret; - asyncio_state *state = get_asyncio_state(module); - if (loop == Py_None) { loop = _asyncio_get_running_loop_impl(module); if (loop == NULL) { @@ -3515,12 +3486,19 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop) Py_INCREF(loop); } - int rc = PyDict_GetItemRef(state->current_tasks, loop, &ret); + PyObject *item; + if (PyObject_GetOptionalAttr(loop, &_Py_ID(_current_task), &item) < 0) { + Py_DECREF(loop); + return NULL; + } + Py_DECREF(loop); - if (rc == 0) { + + if (item == NULL) { Py_RETURN_NONE; } - return ret; + + return item; } @@ -3675,7 +3653,6 @@ module_traverse(PyObject *mod, visitproc visit, void *arg) Py_VISIT(state->non_asyncio_tasks); Py_VISIT(state->eager_tasks); - Py_VISIT(state->current_tasks); Py_VISIT(state->iscoroutine_typecache); Py_VISIT(state->context_kwname); @@ -3706,7 +3683,6 @@ module_clear(PyObject *mod) Py_CLEAR(state->non_asyncio_tasks); Py_CLEAR(state->eager_tasks); - Py_CLEAR(state->current_tasks); Py_CLEAR(state->iscoroutine_typecache); Py_CLEAR(state->context_kwname); @@ -3735,10 +3711,6 @@ module_init(asyncio_state *state) goto fail; } - state->current_tasks = PyDict_New(); - if (state->current_tasks == NULL) { - goto fail; - } state->iscoroutine_typecache = PySet_New(NULL); if (state->iscoroutine_typecache == NULL) { @@ -3872,10 +3844,6 @@ module_exec(PyObject *mod) return -1; } - if (PyModule_AddObjectRef(mod, "_current_tasks", state->current_tasks) < 0) { - return -1; - } - return 0; } From b20a605b7348e1e1c656519275405ffa3f5778a3 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Thu, 2 Jan 2025 14:33:04 +0000 Subject: [PATCH 2/2] it works now --- Lib/asyncio/tasks.py | 6 ++++-- Lib/test/test_asyncio/test_tasks.py | 21 +++++++++++++++++---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 197478f4b7d92f..22eb4955740220 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -1055,8 +1055,10 @@ def _leave_task(loop, task): raise RuntimeError(f"Leaving task {task!r} does not match " f"the current task {loop._current_task!r}.") except AttributeError: - pass - loop._current_task = None + raise RuntimeError(f"Leaving task {task!r} does not match " + f"the current task.") + else: + loop._current_task = None def _swap_current_task(loop, task): diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index b5363226ad79f4..ecb1f0f01acd08 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3002,7 +3002,9 @@ def done(self): def test__enter_task(self): task = mock.Mock() - loop = mock.Mock() + class LoopLike: + pass + loop = LoopLike() self.assertIsNone(asyncio.current_task(loop)) self._enter_task(loop, task) self.assertIs(asyncio.current_task(loop), task) @@ -3011,7 +3013,9 @@ def test__enter_task(self): def test__enter_task_failure(self): task1 = mock.Mock() task2 = mock.Mock() - loop = mock.Mock() + class LoopLike: + pass + loop = LoopLike() self._enter_task(loop, task1) with self.assertRaises(RuntimeError): self._enter_task(loop, task2) @@ -3021,6 +3025,9 @@ def test__enter_task_failure(self): def test__leave_task(self): task = mock.Mock() loop = mock.Mock() + class LoopLike: + pass + loop = LoopLike() self._enter_task(loop, task) self._leave_task(loop, task) self.assertIsNone(asyncio.current_task(loop)) @@ -3028,7 +3035,10 @@ def test__leave_task(self): def test__leave_task_failure1(self): task1 = mock.Mock() task2 = mock.Mock() - loop = mock.Mock() + + class LoopLike: + pass + loop = LoopLike() self._enter_task(loop, task1) with self.assertRaises(RuntimeError): self._leave_task(loop, task2) @@ -3037,7 +3047,10 @@ def test__leave_task_failure1(self): def test__leave_task_failure2(self): task = mock.Mock() - loop = mock.Mock() + + class LoopLike: + pass + loop = LoopLike() with self.assertRaises(RuntimeError): self._leave_task(loop, task) self.assertIsNone(asyncio.current_task(loop))