Skip to content

Commit 2518c76

Browse files
committed
fix asyncio staggered leaking tasks, and logging unhandled exceptions.append exception
1 parent f157485 commit 2518c76

File tree

1 file changed

+49
-26
lines changed

1 file changed

+49
-26
lines changed

Lib/asyncio/staggered.py

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,25 @@ async def staggered_race(coro_fns, delay, *, loop=None):
6666
enum_coro_fns = enumerate(coro_fns)
6767
winner_result = None
6868
winner_index = None
69-
exceptions = []
70-
running_tasks = []
69+
running_tasks = set()
70+
on_completed_fut = None
71+
72+
def task_done(task):
73+
running_tasks.discard(task)
74+
if (
75+
on_completed_fut is not None
76+
and not on_completed_fut.done()
77+
and not running_tasks
78+
):
79+
on_completed_fut.set_result(True)
80+
81+
if task.cancelled():
82+
return
83+
84+
exc = task.exception()
85+
if exc is None:
86+
return
87+
unhandled_exceptions.append(exc)
7188

7289
async def run_one_coro(ok_to_start, previous_failed) -> None:
7390
# in eager tasks this waits for the calling task to append this task
@@ -91,11 +108,11 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
91108
this_failed = locks.Event()
92109
next_ok_to_start = locks.Event()
93110
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
94-
running_tasks.append(next_task)
95-
# next_task has been appended to running_tasks so next_task is ok to
111+
running_tasks.add(next_task)
112+
next_task.add_done_callback(task_done)
113+
# next_task has been appended to running_tasks so next_task is ok to
96114
# start.
97115
next_ok_to_start.set()
98-
assert len(running_tasks) == this_index + 2
99116
# Prepare place to put this coroutine's exceptions if not won
100117
exceptions.append(None)
101118
assert len(exceptions) == this_index + 1
@@ -120,31 +137,37 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
120137
# up as done() == True, cancelled() == False, exception() ==
121138
# asyncio.CancelledError. This behavior is specified in
122139
# https://bugs.python.org/issue30048
123-
for i, t in enumerate(running_tasks):
124-
if i != this_index:
140+
current_task = tasks.current_task(loop)
141+
for t in running_tasks:
142+
if t is not current_task:
125143
t.cancel()
126144

127-
ok_to_start = locks.Event()
128-
first_task = loop.create_task(run_one_coro(ok_to_start, None))
129-
running_tasks.append(first_task)
130-
# first_task has been appended to running_tasks so first_task is ok to start.
131-
ok_to_start.set()
145+
unhandled_exceptions = []
146+
exceptions = []
132147
try:
133-
# Wait for a growing list of tasks to all finish: poor man's version of
134-
# curio's TaskGroup or trio's nursery
135-
done_count = 0
136-
while done_count != len(running_tasks):
137-
done, _ = await tasks.wait(running_tasks)
138-
done_count = len(done)
148+
ok_to_start = locks.Event()
149+
first_task = loop.create_task(run_one_coro(ok_to_start, None))
150+
running_tasks.add(first_task)
151+
first_task.add_done_callback(task_done)
152+
# first_task has been appended to running_tasks so first_task is ok to start.
153+
ok_to_start.set()
154+
propagate_cancellation_error = None
155+
while running_tasks:
156+
if on_completed_fut is None:
157+
on_completed_fut = loop.create_future()
158+
try:
159+
await on_completed_fut
160+
except exceptions_mod.CancelledError as ex:
161+
propagate_cancellation_error = ex
162+
for task in running_tasks:
163+
task.cancel(*ex.args)
164+
on_completed_fut = None
165+
if __debug__ and unhandled_exceptions:
139166
# If run_one_coro raises an unhandled exception, it's probably a
140167
# programming error, and I want to see it.
141-
if __debug__:
142-
for d in done:
143-
if d.done() and not d.cancelled() and d.exception():
144-
raise d.exception()
168+
raise ExceptionGroup("multiple errors in staggered race", unhandled_exceptions)
169+
if propagate_cancellation_error is not None:
170+
raise propagate_cancellation_error
145171
return winner_result, winner_index, exceptions
146172
finally:
147-
del exceptions
148-
# Make sure no tasks are left running if we leave this function
149-
for t in running_tasks:
150-
t.cancel()
173+
del exceptions, propagate_cancellation_error, unhandled_exceptions

0 commit comments

Comments
 (0)