Skip to content

Commit d4df51d

Browse files
authored
Handle CancelledErrors properly (#1027)
The method `exception()` in `Task` doesn't return `CancelledError` if the task was cancelled, instead it raises the error, so if a task is cancelled an exception would be raised unintentionally. Instead we want to catch `CancelledError` and treat it like any other task error for these tasks, are they are not supposed to be cancelled. This PR also adds some improvements to the resampling actor, to avoid unnecessary restarts. Fixes #1026.
2 parents f3fc397 + 3ca78f8 commit d4df51d

File tree

4 files changed

+80
-43
lines changed

4 files changed

+80
-43
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@
4141
- Re-expose `ComponentMetricId` to the docs.
4242
- Fixed typing ambiguities when building composite formulas on streaming data.
4343
- Fixed a bug that was causing the `PowerDistributor` to exit if power requests to PV inverters or EV chargers timeout.
44+
- Fix handling of cancelled tasks in the data sourcing and resampling actor.

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,11 +413,13 @@ async def clean_tasks(
413413
) -> set[asyncio.Task[None]]:
414414
done, pending = await asyncio.wait(sending_tasks, timeout=0)
415415
for task in done:
416-
if error := task.exception():
417-
_logger.error(
416+
try:
417+
task.result()
418+
# pylint: disable-next=broad-except
419+
except (asyncio.CancelledError, Exception):
420+
_logger.exception(
418421
"Error while processing message in task %s",
419422
task.get_name(),
420-
exc_info=error,
421423
)
422424
return pending
423425

src/frequenz/sdk/actor/_resampling.py

Lines changed: 68 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -106,58 +106,36 @@ async def _run(self) -> None:
106106
107107
- One task to process incoming subscription requests to resample new metrics.
108108
- One task to run the resampler.
109-
110-
Raises:
111-
RuntimeError: If there is some unexpected error while resampling or
112-
handling requests.
113109
"""
114110
tasks_to_cancel: set[asyncio.Task[None]] = set()
115-
try:
116-
subscriptions_task = asyncio.create_task(
117-
self._process_resampling_requests()
118-
)
119-
tasks_to_cancel.add(subscriptions_task)
111+
subscriptions_task: asyncio.Task[None] | None = None
112+
resampling_task: asyncio.Task[None] | None = None
120113

114+
try:
121115
while True:
122-
resampling_task = asyncio.create_task(self._resampler.resample())
123-
tasks_to_cancel.add(resampling_task)
116+
if subscriptions_task is None or subscriptions_task.done():
117+
subscriptions_task = asyncio.create_task(
118+
self._process_resampling_requests()
119+
)
120+
tasks_to_cancel.add(subscriptions_task)
121+
122+
if resampling_task is None or resampling_task.done():
123+
resampling_task = asyncio.create_task(self._resampler.resample())
124+
tasks_to_cancel.add(resampling_task)
125+
124126
done, _ = await asyncio.wait(
125127
[resampling_task, subscriptions_task],
126128
return_when=asyncio.FIRST_COMPLETED,
127129
)
128130

129131
if subscriptions_task in done:
130132
tasks_to_cancel.remove(subscriptions_task)
131-
raise RuntimeError(
132-
"There was a problem with the subscriptions channel."
133-
)
133+
self._log_subscriptions_task_error(subscriptions_task)
134134

135135
if resampling_task in done:
136136
tasks_to_cancel.remove(resampling_task)
137-
# The resampler shouldn't end without an exception
138-
error = resampling_task.exception()
139-
assert (
140-
error is not None
141-
), "The resample() function shouldn't exit normally."
142-
143-
# We don't know what to do with something other than
144-
# ResamplingError, so propagate the exception if that is the
145-
# case.
146-
if not isinstance(error, ResamplingError):
147-
raise error
148-
for source, source_error in error.exceptions.items():
149-
_logger.error(
150-
"Error resampling source %s, removing source...", source
151-
)
152-
removed = self._resampler.remove_timeseries(source)
153-
if not removed:
154-
_logger.warning(
155-
"Got an exception from an unknown source: "
156-
"source=%r, exception=%r",
157-
source,
158-
source_error,
159-
)
160-
# The resampling_task will be re-created if we reached this point
137+
self._log_resampling_task_error(resampling_task)
138+
161139
finally:
162140
await asyncio.gather(*[cancel_and_await(t) for t in tasks_to_cancel])
163141

@@ -171,3 +149,55 @@ async def _run(self) -> None:
171149
# state would be mostly the same as not really leaving the run()
172150
# method and just swallow any exception, which doesn't look super
173151
# smart.
152+
153+
def _log_subscriptions_task_error(
154+
self, subscriptions_task: asyncio.Task[None]
155+
) -> None:
156+
"""Log an error from a stopped subscriptions task.
157+
158+
Args:
159+
subscriptions_task: The subscriptions task.
160+
"""
161+
try:
162+
subscriptions_task.result()
163+
# pylint: disable-next=broad-except
164+
except (Exception, asyncio.CancelledError):
165+
_logger.exception(
166+
"The subscriptions task ended with an exception, restarting..."
167+
)
168+
else:
169+
_logger.error("The subscriptions task ended unexpectedly, restarting...")
170+
171+
def _log_resampling_task_error(self, resampling_task: asyncio.Task[None]) -> None:
172+
"""Log an error from a stopped resampling task.
173+
174+
Args:
175+
resampling_task: The resampling task.
176+
"""
177+
# The resampler shouldn't be cancelled or end without an exception
178+
try:
179+
resampling_task.result()
180+
except ResamplingError as error:
181+
for source, source_error in error.exceptions.items():
182+
_logger.error("Error resampling source %s, removing source...", source)
183+
removed = self._resampler.remove_timeseries(source)
184+
if not removed:
185+
_logger.error(
186+
"Got an exception from an unknown source: "
187+
"source=%r, exception=%r",
188+
source,
189+
source_error,
190+
)
191+
# pylint: disable-next=broad-except
192+
except (Exception, asyncio.CancelledError):
193+
# We don't know what to do with something other than
194+
# ResamplingError, so we log it, restart, and hope for the best.
195+
_logger.exception(
196+
"The resample() function got an unexpected error, restarting..."
197+
)
198+
else:
199+
# The resample function should not end normally, so we log it,
200+
# restart, and hope for the best.
201+
_logger.error(
202+
"The resample() function ended without an exception, restarting..."
203+
)

tests/timeseries/mock_microgrid.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,12 @@ def start_mock_client(
230230
self.init_mock_client(initialize_cb)
231231

232232
def _done_callback(task: asyncio.Task[None]) -> None:
233-
if exc := task.exception():
234-
raise SystemExit(f"Streaming task {task.get_name()!r} failed: {exc}")
233+
try:
234+
task.result()
235+
except (asyncio.CancelledError, Exception) as exc:
236+
raise SystemExit(
237+
f"Streaming task {task.get_name()!r} failed: {exc}"
238+
) from exc
235239

236240
for component_id, coro in self._streaming_coros:
237241
task = asyncio.create_task(coro, name=f"component-id:{component_id}")

0 commit comments

Comments
 (0)