Skip to content

Commit 5212a47

Browse files
committed
Catch CancelledErrors properly
The method `exception()` in `Task` doesn't return `CancelledError` if the task was cancelled, instead it raises the error, so if a task is cancelled an exception would be raised unintentionally. Instead we want to catch `CancelledError` and treat it like any other task error for these tasks, are they are not supposed to be cancelled. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 8b93da9 commit 5212a47

File tree

3 files changed

+39
-26
lines changed

3 files changed

+39
-26
lines changed

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,11 +413,13 @@ async def clean_tasks(
413413
) -> set[asyncio.Task[None]]:
414414
done, pending = await asyncio.wait(sending_tasks, timeout=0)
415415
for task in done:
416-
if error := task.exception():
417-
_logger.error(
416+
try:
417+
task.result()
418+
# pylint: disable-next=broad-except
419+
except (asyncio.CancelledError, Exception):
420+
_logger.exception(
418421
"Error while processing message in task %s",
419422
task.get_name(),
420-
exc_info=error,
421423
)
422424
return pending
423425

src/frequenz/sdk/actor/_resampling.py

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ async def _run(self) -> None:
119119
tasks_to_cancel.add(subscriptions_task)
120120

121121
while True:
122+
122123
resampling_task = asyncio.create_task(self._resampler.resample())
123124
tasks_to_cancel.add(resampling_task)
124125
done, _ = await asyncio.wait(
@@ -134,29 +135,35 @@ async def _run(self) -> None:
134135

135136
if resampling_task in done:
136137
tasks_to_cancel.remove(resampling_task)
137-
# The resampler shouldn't end without an exception
138-
error = resampling_task.exception()
139-
assert (
140-
error is not None
141-
), "The resample() function shouldn't exit normally."
142-
143-
# We don't know what to do with something other than
144-
# ResamplingError, so propagate the exception if that is the
145-
# case.
146-
if not isinstance(error, ResamplingError):
147-
raise error
148-
for source, source_error in error.exceptions.items():
138+
# The resampler shouldn't be cancelled or end without an exception
139+
try:
140+
resampling_task.result()
141+
except ResamplingError as error:
142+
for source, source_error in error.exceptions.items():
143+
_logger.error(
144+
"Error resampling source %s, removing source...", source
145+
)
146+
removed = self._resampler.remove_timeseries(source)
147+
if not removed:
148+
_logger.error(
149+
"Got an exception from an unknown source: "
150+
"source=%r, exception=%r",
151+
source,
152+
source_error,
153+
)
154+
# pylint: disable-next=broad-except
155+
except (Exception, asyncio.CancelledError):
156+
# We don't know what to do with something other than
157+
# ResamplingError, so we log it, restart, and hope for the best.
158+
_logger.exception(
159+
"The resample() function got an unexpected error, restarting..."
160+
)
161+
else:
162+
# The resample function should not end normally, so we log it,
163+
# restart, and hope for the best.
149164
_logger.error(
150-
"Error resampling source %s, removing source...", source
165+
"The resample() function ended without an exception, restarting..."
151166
)
152-
removed = self._resampler.remove_timeseries(source)
153-
if not removed:
154-
_logger.warning(
155-
"Got an exception from an unknown source: "
156-
"source=%r, exception=%r",
157-
source,
158-
source_error,
159-
)
160167
# The resampling_task will be re-created if we reached this point
161168
finally:
162169
await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])

tests/timeseries/mock_microgrid.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,12 @@ def start_mock_client(
230230
self.init_mock_client(initialize_cb)
231231

232232
def _done_callback(task: asyncio.Task[None]) -> None:
233-
if exc := task.exception():
234-
raise SystemExit(f"Streaming task {task.get_name()!r} failed: {exc}")
233+
try:
234+
task.result()
235+
except (asyncio.CancelledError, Exception) as exc:
236+
raise SystemExit(
237+
f"Streaming task {task.get_name()!r} failed: {exc}"
238+
) from exc
235239

236240
for component_id, coro in self._streaming_coros:
237241
task = asyncio.create_task(coro, name=f"component-id:{component_id}")

0 commit comments

Comments
 (0)