diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index a7fb55982abe9c..f49035b0a17aba 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -768,6 +768,14 @@ def _get_running_loop(): This is a low-level function intended to be used by event loops. This function is thread-specific. """ + # Prefer C implementation if available + try: + if _c__get_running_loop is not _py__get_running_loop: + return _c__get_running_loop() + except NameError: + # C implementation not available + pass + # NOTE: this function is implemented in C (see _asynciomodule.c) running_loop, pid = _running_loop.loop_pid if running_loop is not None and pid == os.getpid(): @@ -783,6 +791,14 @@ def _set_running_loop(loop): # NOTE: this function is implemented in C (see _asynciomodule.c) _running_loop.loop_pid = (loop, os.getpid()) + # Keep C implementation in sync if available + try: + if _c__set_running_loop is not _py__set_running_loop: + _c__set_running_loop(loop) + except NameError: + # C implementation not available + pass + def _init_event_loop_policy(): global _event_loop_policy diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index fbd5c39a7c56ac..b9d3f2a8d3fd14 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -38,6 +38,12 @@ def current_task(loop=None): """Return a currently executed task.""" if loop is None: loop = events.get_running_loop() + + # If we have a C implementation, prefer it + if current_task is not _py_current_task: + return _c_current_task(loop) + + # Fall back to Python state return _current_tasks.get(loop) @@ -1066,6 +1072,9 @@ def factory(loop, coro, *, eager_start=True, **kwargs): # all running event loops. {EventLoop: Task} _current_tasks = {} +# Initialize C function references to Python implementations +_c_current_task = None + def _register_task(task): """Register an asyncio Task scheduled to run on an event loop.""" @@ -1083,6 +1092,9 @@ def _enter_task(loop, task): raise RuntimeError(f"Cannot enter into task {task!r} while another " f"task {current_task!r} is being executed.") _current_tasks[loop] = task + if _c_swap_current_task is not _py_swap_current_task: + # Keep the C task state in sync + _c_swap_current_task(loop, task) def _leave_task(loop, task): @@ -1091,6 +1103,9 @@ def _leave_task(loop, task): raise RuntimeError(f"Leaving task {task!r} does not match " f"the current task {current_task!r}.") del _current_tasks[loop] + if _c_swap_current_task is not _py_swap_current_task: + # Keep the C task state in sync + _c_swap_current_task(loop, None) def _swap_current_task(loop, task): @@ -1099,6 +1114,9 @@ def _swap_current_task(loop, task): del _current_tasks[loop] else: _current_tasks[loop] = task + if _c_swap_current_task is not _py_swap_current_task: + # Keep the C task state in sync + _c_swap_current_task(loop, task) return prev_task @@ -1122,6 +1140,9 @@ def _unregister_eager_task(task): _py_swap_current_task = _swap_current_task _py_all_tasks = all_tasks +# Initially point C functions to Python implementations +_c_current_task = current_task + try: from _asyncio import (_register_task, _register_eager_task, _unregister_task, _unregister_eager_task, diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 919d543b0329e9..6d553379191df3 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -2988,6 +2988,82 @@ def tearDown(self): asyncio.Task = asyncio.tasks.Task = self._Task_saved asyncio.Future = asyncio.futures.Future = self._Future_saved + def test_py_set_running_loop_c_get_sync(self): + """Test that _py_set_running_loop synchronizes with C _get_running_loop. + + This verifies that when Python sets the running loop, both Python + and C implementations can retrieve it correctly. + """ + py_set = events._py__set_running_loop + c_set = self._set_running_loop_saved + py_get = events._py__get_running_loop + c_get = self._get_running_loop_saved + if py_set is c_set: + self.skipTest("C _asyncio implementation not available") + + old_running_loop = events._get_running_loop() + try: + # Clear any existing loop using Python implementation + py_set(None) + self.assertIsNone(py_get()) + self.assertIsNone(c_get()) + + # Create test loop and set it using Python implementation + test_loop = asyncio.new_event_loop() + py_set(test_loop) + + # Verify both Python and C implementations see the same loop + self.assertIs(py_get(), test_loop) + self.assertIs(c_get(), test_loop) + + # Clear and verify both see None + py_set(None) + self.assertIsNone(py_get()) + self.assertIsNone(c_get()) + + test_loop.close() + + finally: + events._set_running_loop(old_running_loop) + + def test_c_set_running_loop_py_get_sync(self): + """Test that C _set_running_loop synchronizes with _py_get_running_loop. + + This verifies that when C sets the running loop, both C and Python + implementations can retrieve it correctly. + """ + py_set = events._py__set_running_loop + c_set = self._set_running_loop_saved + py_get = events._py__get_running_loop + c_get = self._get_running_loop_saved + if py_set is c_set: + self.skipTest("C _asyncio implementation not available") + + old_running_loop = events._get_running_loop() + try: + # Clear any existing loop using C implementation + c_set(None) + self.assertIsNone(c_get()) + self.assertIsNone(py_get()) + + # Create test loop and set it using C implementation + test_loop = asyncio.new_event_loop() + c_set(test_loop) + + # Verify both C and Python implementations see the same loop + self.assertIs(c_get(), test_loop) + self.assertIs(py_get(), test_loop) + + # Clear and verify both see None + c_set(None) + self.assertIsNone(c_get()) + self.assertIsNone(py_get()) + + test_loop.close() + + finally: + events._set_running_loop(old_running_loop) + if sys.platform != 'win32': def test_get_event_loop_new_process(self): # bpo-32126: The multiprocessing module used by @@ -2998,6 +3074,15 @@ def test_get_event_loop_new_process(self): self.addCleanup(multiprocessing_cleanup_tests) async def main(): + # Test that current_task() works correctly with _PyTask + # This demonstrates the C/Python task sync problem + try: + current = asyncio.current_task() + self.assertIsNotNone(current, + "current_task() should not return None when called from _PyTask") + except RuntimeError as e: + self.fail(f"current_task() failed when called from _PyTask: {e}") + if multiprocessing.get_start_method() == 'fork': # Avoid 'fork' DeprecationWarning. mp_context = multiprocessing.get_context('forkserver') diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index 931a43816a257a..47b2711a628cf2 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -3710,6 +3710,105 @@ async def coro(): self.assertEqual(result, 11) +class CurrentTaskTestsMixin: + """Tests for current_task() function working with a mixture of _PyTask + and _CTask instances. + """ + + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + try: + super().tearDown() + finally: + self.loop.close() + asyncio.set_event_loop(None) + + async def _test_coro(self): + # Get the current task - this should not return None + current = asyncio.current_task() + self.assertIsNotNone(current, "current_task() should not return None") + return current + + + def current_task(self, eager=False): + """Test that current_task() works correctly with _PyTask + + This is a regression test for an issue where current_task() would return + None when called from within a _PyTask + The issue was caused by incomplete synchronization between the C and + Python asyncio implementations. + """ + + # Test with eager execution (common case where the bug occurred) + task = self.Task(self._test_coro(), loop=self.loop, eager_start=eager) + result = self.loop.run_until_complete(task) + self.assertIs(result, task) + + def test_current_task_eager(self): + """Test current_task() with eager execution.""" + self.current_task(eager=True) + + def test_current_task(self): + """Test current_task() without eager execution.""" + self.current_task(eager=False) + + def test_current_task_consistency_after_task_switch(self): + """Test that current_task() remains consistent during task switching. + + This tests the synchronization between C and Python implementations + when tasks are swapped in and out of execution. + """ + results = [] + + async def task_a(): + results.append(('task_a_start', asyncio.current_task())) + await asyncio.sleep(0) # Yield control + results.append(('task_a_end', asyncio.current_task())) + return "A" + + async def task_b(): + results.append(('task_b_start', asyncio.current_task())) + await asyncio.sleep(0) # Yield control + results.append(('task_b_end', asyncio.current_task())) + return "B" + + async def main(): + # Start both tasks concurrently + a = self.Task(task_a()) + b = self.Task(task_b()) + + return await asyncio.gather(a, b) + + result = self.loop.run_until_complete(main()) + self.assertEqual(result, ["A", "B"]) + + # Verify that current_task() was never None and was consistent + for label, current_task in results: + self.assertIsNotNone(current_task, f"current_task() was None at {label}") + + # Verify we got results from both tasks + task_a_results = [r for r in results if r[0].startswith('task_a')] + task_b_results = [r for r in results if r[0].startswith('task_b')] + self.assertEqual(len(task_a_results), 2) + self.assertEqual(len(task_b_results), 2) + + +@unittest.skipUnless(hasattr(tasks, '_CTask'), + 'requires the C _asyncio module') +class CTask_CurrentTask_Tests(CurrentTaskTestsMixin, + test_utils.TestCase): + Task = getattr(tasks, '_CTask', None) + + +class PyTask_CurrentTask_Tests(CurrentTaskTestsMixin, + test_utils.TestCase): + Task = tasks._PyTask + + class CompatibilityTests(test_utils.TestCase): # Tests for checking a bridge between old-styled coroutines # and async/await syntax