Skip to content

Commit 57bf872

Browse files
committed
Add current_task() tests
1 parent 8acc2a8 commit 57bf872

File tree

1 file changed

+99
-0
lines changed

1 file changed

+99
-0
lines changed

Lib/test/test_asyncio/test_tasks.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3710,6 +3710,105 @@ async def coro():
37103710
self.assertEqual(result, 11)
37113711

37123712

3713+
class CurrentTaskTestsMixin:
3714+
"""Tests for current_task() function working with a mixture of _PyTask
3715+
and _CTask instances.
3716+
"""
3717+
3718+
def setUp(self):
3719+
super().setUp()
3720+
self.loop = asyncio.new_event_loop()
3721+
asyncio.set_event_loop(self.loop)
3722+
3723+
def tearDown(self):
3724+
try:
3725+
super().tearDown()
3726+
finally:
3727+
self.loop.close()
3728+
asyncio.set_event_loop(None)
3729+
3730+
async def _test_coro(self):
3731+
# Get the current task - this should not return None
3732+
current = asyncio.current_task()
3733+
self.assertIsNotNone(current, "current_task() should not return None")
3734+
return current
3735+
3736+
3737+
def current_task(self, eager=False):
3738+
"""Test that current_task() works correctly with _PyTask
3739+
3740+
This is a regression test for an issue where current_task() would return
3741+
None when called from within a _PyTask
3742+
The issue was caused by incomplete synchronization between the C and
3743+
Python asyncio implementations.
3744+
"""
3745+
3746+
# Test with eager execution (common case where the bug occurred)
3747+
task = self.Task(self._test_coro(), loop=self.loop, eager_start=eager)
3748+
result = self.loop.run_until_complete(task)
3749+
self.assertIs(result, task)
3750+
3751+
def test_current_task_eager(self):
3752+
"""Test current_task() with eager execution."""
3753+
self.current_task(eager=True)
3754+
3755+
def test_current_task(self):
3756+
"""Test current_task() without eager execution."""
3757+
self.current_task(eager=False)
3758+
3759+
def test_current_task_consistency_after_task_switch(self):
3760+
"""Test that current_task() remains consistent during task switching.
3761+
3762+
This tests the synchronization between C and Python implementations
3763+
when tasks are swapped in and out of execution.
3764+
"""
3765+
results = []
3766+
3767+
async def task_a():
3768+
results.append(('task_a_start', asyncio.current_task()))
3769+
await asyncio.sleep(0) # Yield control
3770+
results.append(('task_a_end', asyncio.current_task()))
3771+
return "A"
3772+
3773+
async def task_b():
3774+
results.append(('task_b_start', asyncio.current_task()))
3775+
await asyncio.sleep(0) # Yield control
3776+
results.append(('task_b_end', asyncio.current_task()))
3777+
return "B"
3778+
3779+
async def main():
3780+
# Start both tasks concurrently
3781+
a = self.Task(task_a())
3782+
b = self.Task(task_b())
3783+
3784+
return await asyncio.gather(a, b)
3785+
3786+
result = self.loop.run_until_complete(main())
3787+
self.assertEqual(result, ["A", "B"])
3788+
3789+
# Verify that current_task() was never None and was consistent
3790+
for label, current_task in results:
3791+
self.assertIsNotNone(current_task, f"current_task() was None at {label}")
3792+
3793+
# Verify we got results from both tasks
3794+
task_a_results = [r for r in results if r[0].startswith('task_a')]
3795+
task_b_results = [r for r in results if r[0].startswith('task_b')]
3796+
self.assertEqual(len(task_a_results), 2)
3797+
self.assertEqual(len(task_b_results), 2)
3798+
3799+
3800+
@unittest.skipUnless(hasattr(tasks, '_CTask'),
3801+
'requires the C _asyncio module')
3802+
class CTask_CurrentTask_Tests(CurrentTaskTestsMixin,
3803+
test_utils.TestCase):
3804+
Task = getattr(tasks, '_CTask', None)
3805+
3806+
3807+
class PyTask_CurrentTask_Tests(CurrentTaskTestsMixin,
3808+
test_utils.TestCase):
3809+
Task = tasks._PyTask
3810+
3811+
37133812
class CompatibilityTests(test_utils.TestCase):
37143813
# Tests for checking a bridge between old-styled coroutines
37153814
# and async/await syntax

0 commit comments

Comments
 (0)