Skip to content

Commit 80fa959

Browse files
committed
Avoid unnecessary restarts of the resampling actor
When the subscriptions task ends, the actor is raising a `RuntimeException` which causes the actor to restart, which means re-creating not only the subscriptions task but also the resampling task, which could introduce glitches into the resampling streams unnecessarily. In this commit we just restart the subscriptions tasks instead of the whole actor. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 5212a47 commit 80fa959

File tree

1 file changed

+24
-15
lines changed

1 file changed

+24
-15
lines changed

src/frequenz/sdk/actor/_resampling.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,32 +106,41 @@ async def _run(self) -> None:
106106
107107
- One task to process incoming subscription requests to resample new metrics.
108108
- One task to run the resampler.
109-
110-
Raises:
111-
RuntimeError: If there is some unexpected error while resampling or
112-
handling requests.
113109
"""
114110
tasks_to_cancel: set[asyncio.Task[None]] = set()
115-
try:
116-
subscriptions_task = asyncio.create_task(
117-
self._process_resampling_requests()
118-
)
119-
tasks_to_cancel.add(subscriptions_task)
111+
subscriptions_task: asyncio.Task[None] | None = None
112+
resampling_task: asyncio.Task[None] | None = None
120113

114+
try:
121115
while True:
116+
if subscriptions_task is None or subscriptions_task.done():
117+
subscriptions_task = asyncio.create_task(
118+
self._process_resampling_requests()
119+
)
120+
tasks_to_cancel.add(subscriptions_task)
121+
122+
if resampling_task is None or resampling_task.done():
123+
resampling_task = asyncio.create_task(self._resampler.resample())
124+
tasks_to_cancel.add(resampling_task)
122125

123-
resampling_task = asyncio.create_task(self._resampler.resample())
124-
tasks_to_cancel.add(resampling_task)
125126
done, _ = await asyncio.wait(
126127
[resampling_task, subscriptions_task],
127128
return_when=asyncio.FIRST_COMPLETED,
128129
)
129130

130131
if subscriptions_task in done:
131132
tasks_to_cancel.remove(subscriptions_task)
132-
raise RuntimeError(
133-
"There was a problem with the subscriptions channel."
134-
)
133+
try:
134+
subscriptions_task.result()
135+
# pylint: disable-next=broad-except
136+
except (Exception, asyncio.CancelledError):
137+
_logger.exception(
138+
"The subscriptions task ended with an exception, restarting..."
139+
)
140+
else:
141+
_logger.error(
142+
"The subscriptions task ended unexpectedly, restarting..."
143+
)
135144

136145
if resampling_task in done:
137146
tasks_to_cancel.remove(resampling_task)
@@ -164,7 +173,7 @@ async def _run(self) -> None:
164173
_logger.error(
165174
"The resample() function ended without an exception, restarting..."
166175
)
167-
# The resampling_task will be re-created if we reached this point
176+
168177
finally:
169178
await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])
170179

0 commit comments

Comments
 (0)