Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from . import exceptions as exceptions_mod
from . import locks
from . import tasks
from . import futures


async def staggered_race(coro_fns, delay, *, loop=None):
Expand Down Expand Up @@ -63,6 +64,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):
"""
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
loop = loop or events.get_running_loop()
parent_task = tasks.current_task(loop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we're not running in a task? Should we assert that this is non-None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one possible outcome is that there's no task or running loop:

import asyncio.staggered

async def main():
    async def asyncfn():
        pass

    await asyncio.staggered.staggered_race([asyncfn, asyncfn], delay=None)

main().__await__().send(None)

we get a nice traceback, saying that:

./python demo.py
Traceback (most recent call last):
  File "/home/graingert/projects/cpython/demo.py", line 9, in <module>
    main().__await__().send(None)
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/home/graingert/projects/cpython/demo.py", line 7, in main
    await asyncio.staggered.staggered_race([asyncfn, asyncfn], delay=None)
  File "/home/graingert/projects/cpython/Lib/asyncio/staggered.py", line 66, in staggered_race
    loop = loop or events.get_running_loop()
                   ~~~~~~~~~~~~~~~~~~~~~~~^^
RuntimeError: no running event loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you manually pass a loop, and step the coroutine outside of a task:

import asyncio.staggered

async def main(loop):
    async def asyncfn():
        print("hello")

    await asyncio.staggered.staggered_race([asyncfn, asyncfn], delay=None, loop=loop)
    return "world"

loop = asyncio.EventLoop()
coro = main(loop).__await__()
f = coro.send(None)
loop.run_until_complete(f)
try:
    coro.send(None)
except StopIteration as e:
    print(e.value)

by some miracle it all works, this is because future_add_to_awaited_by and future_discard_from_awaited_by is noop if any arg is not a future (eg is None) and we don't have any further use of the parent task

./python demo.py
hello
world

so we should probably leave it like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's worth testing this usecase though

enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
Expand All @@ -73,6 +75,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):

def task_done(task):
running_tasks.discard(task)
futures.future_discard_from_awaited_by(task, parent_task)
if (
on_completed_fut is not None
and not on_completed_fut.done()
Expand Down Expand Up @@ -110,6 +113,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
this_failed = locks.Event()
next_ok_to_start = locks.Event()
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
futures.future_add_to_awaited_by(next_task, parent_task)
running_tasks.add(next_task)
next_task.add_done_callback(task_done)
# next_task has been appended to running_tasks so next_task is ok to
Expand Down Expand Up @@ -148,6 +152,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above this--for cancelling all the coroutines, future_discard_from_awaited_by should get called, right?

Copy link
Contributor Author

@graingert graingert Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah add_done_callback callbacks are called on cancellation, so it will get called

ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
futures.future_add_to_awaited_by(first_task, parent_task)
running_tasks.add(first_task)
first_task.add_done_callback(task_done)
# first_task has been appended to running_tasks so first_task is ok to start.
Expand All @@ -171,4 +176,4 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
raise propagate_cancellation_error
return winner_result, winner_index, exceptions
finally:
del exceptions, propagate_cancellation_error, unhandled_exceptions
del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task
63 changes: 63 additions & 0 deletions Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,69 @@ async def main():
]
self.assertEqual(stack_trace, expected_stack_trace)

@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_async_staggered_race_remote_stack_trace(self):
# Spawn a process with some realistic Python code
script = textwrap.dedent("""\
import asyncio.staggered
import time
import sys

async def deep():
await asyncio.sleep(0)
fifo_path = sys.argv[1]
with open(fifo_path, "w") as fifo:
fifo.write("ready")
time.sleep(10000)

async def c1():
await asyncio.sleep(0)
await deep()

async def c2():
await asyncio.sleep(10000)

async def main():
await asyncio.staggered.staggered_race(
[c1, c2],
delay=None,
)

asyncio.run(main())
""")
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
fifo = f"{work_dir}/the_fifo"
os.mkfifo(fifo)
script_name = _make_test_script(script_dir, 'script', script)
try:
p = subprocess.Popen([sys.executable, script_name, str(fifo)])
with open(fifo, "r") as fifo_file:
response = fifo_file.read()
self.assertEqual(response, "ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace")
finally:
os.remove(fifo)
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)

# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])

expected_stack_trace = [
['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]]
]
self.assertEqual(stack_trace, expected_stack_trace)

@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support reporting call graph information from :func:`!asyncio.staggered.staggered_race`
Loading