Skip to content

Commit 3a43a89

Browse files
committed
Clean up retry-tasks on shutdown
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 9be386c commit 3a43a89

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ def new_receiver(self) -> Receiver[Dispatch]:
155155
"""
156156
return self._channel.new_receiver()
157157

158+
def cleanup(self) -> None:
159+
"""Clean up the retry manager."""
160+
for task in self._tasks:
161+
task.cancel()
162+
self._tasks.clear()
163+
158164
def retry(self, dispatch: Dispatch) -> None:
159165
"""Retry a dispatch.
160166
@@ -290,13 +296,16 @@ async def _run(self) -> None:
290296
async for dispatch in self._dispatch_rx:
291297
await self._handle_dispatch(dispatch)
292298
else:
293-
retry_recv = self._retrier.new_receiver()
294-
295-
async for selected in select(retry_recv, self._dispatch_rx):
296-
if retry_recv.triggered(selected):
297-
self._retrier.retry(selected.message)
298-
elif self._dispatch_rx.triggered(selected):
299-
await self._handle_dispatch(selected.message)
299+
try:
300+
retry_recv = self._retrier.new_receiver()
301+
302+
async for selected in select(retry_recv, self._dispatch_rx):
303+
if retry_recv.triggered(selected):
304+
self._retrier.retry(selected.message)
305+
elif self._dispatch_rx.triggered(selected):
306+
await self._handle_dispatch(selected.message)
307+
finally:
308+
self._retrier.cleanup()
300309

301310
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
302311
"""Handle a dispatch.

0 commit comments

Comments
 (0)