Skip to content

Commit 2cd0b79

Browse files
committed
Add a BackgroundService abstract base class
A `BackgroundService` is a service that runs in the background spawning one or more tasks. The service can be started and stopped and can work as an async context manager to provide deterministic cleanup. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 6e28342 commit 2cd0b79

File tree

2 files changed

+242
-0
lines changed

2 files changed

+242
-0
lines changed

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""A base class for creating simple composable actors."""
55

66
from ..timeseries._resampling import ResamplerConfig
7+
from ._background_service import BackgroundService
78
from ._channel_registry import ChannelRegistry
89
from ._config_managing import ConfigManagingActor
910
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
@@ -12,6 +13,7 @@
1213
from ._run_utils import run
1314

1415
__all__ = [
16+
"BackgroundService",
1517
"ChannelRegistry",
1618
"ComponentMetricRequest",
1719
"ComponentMetricsResamplingActor",
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Background service implementation."""
5+
6+
import abc
7+
import asyncio
8+
import collections.abc
9+
from types import TracebackType
10+
from typing import Any, Self
11+
12+
13+
class BackgroundService(abc.ABC):
14+
"""A background service that can be started and stopped.
15+
16+
A background service is a service that runs in the background spawning one or more
17+
tasks. The service can be [started][frequenz.sdk.actor.BackgroundService.start]
18+
and [stopped][frequenz.sdk.actor.BackgroundService.stop] and can work as an
19+
async context manager to provide deterministic cleanup.
20+
21+
To implement a background service, subclasses must implement the
22+
[`start()`][frequenz.sdk.actor.BackgroundService.start] method, which should
23+
start the background tasks needed by the service, and add them to the `_tasks`
24+
protected attribute.
25+
26+
If you need to collect results or handle exceptions of the tasks when stopping the
27+
service, then you need to also override the
28+
[`stop()`][frequenz.sdk.actor.BackgroundService.stop] method, as the base
29+
implementation does not collect any results and re-raises all exceptions.
30+
31+
!!! warning
32+
33+
As background services manage [`asyncio.Task`][] objects, a reference to them
34+
must be held for as long as the background service is expected to be running,
35+
otherwise its tasks will be cancelled and the service will stop. For more
36+
information, please refer to the [Python `asyncio`
37+
documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).
38+
39+
Example:
40+
```python
41+
import datetime
42+
import asyncio
43+
44+
class Clock(BackgroundService):
45+
def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
46+
super().__init__(name=name)
47+
self._resolution_s = resolution_s
48+
49+
async def start(self) -> None:
50+
self._tasks.add(asyncio.create_task(self._tick()))
51+
52+
async def _tick(self) -> None:
53+
while True:
54+
await asyncio.sleep(self._resolution_s)
55+
print(datetime.datetime.now())
56+
57+
async def main() -> None:
58+
# As an async context manager
59+
async with Clock(resolution_s=1):
60+
await asyncio.sleep(5)
61+
62+
# Manual start/stop (only use if necessary, as cleanup is more complicated)
63+
clock = Clock(resolution_s=1)
64+
await clock.start()
65+
await asyncio.sleep(5)
66+
await clock.stop()
67+
68+
asyncio.run(main())
69+
```
70+
"""
71+
72+
def __init__(self, *, name: str | None = None) -> None:
73+
"""Initialize this BackgroundService.
74+
75+
Args:
76+
name: The name of this background service. If `None`, `str(id(self))` will
77+
be used. This is used mostly for debugging purposes.
78+
"""
79+
self._name: str = str(id(self)) if name is None else name
80+
self._tasks: set[asyncio.Task[Any]] = set()
81+
82+
@abc.abstractmethod
83+
async def start(self) -> None:
84+
"""Start this background service."""
85+
86+
@property
87+
def name(self) -> str:
88+
"""The name of this background service.
89+
90+
Returns:
91+
The name of this background service.
92+
"""
93+
return self._name
94+
95+
@property
96+
def tasks(self) -> collections.abc.Set[asyncio.Task[Any]]:
97+
"""Return the set of running tasks spawned by this background service.
98+
99+
Users typically should not modify the tasks in the returned set and only use
100+
them for informational purposes.
101+
102+
!!! danger
103+
104+
Changing the returned tasks may lead to unexpected behavior, don't do it
105+
unless the class explicitly documents it is safe to do so.
106+
107+
Returns:
108+
The set of running tasks spawned by this background service.
109+
"""
110+
return self._tasks
111+
112+
@property
113+
def is_running(self) -> bool:
114+
"""Return whether this background service is running.
115+
116+
A service is considered running when at least one task is running.
117+
118+
Returns:
119+
Whether this background service is running.
120+
"""
121+
return any(not task.done() for task in self._tasks)
122+
123+
def cancel(self, msg: str | None = None) -> None:
124+
"""Cancel all running tasks spawned by this background service.
125+
126+
Args:
127+
msg: The message to be passed to the tasks being cancelled.
128+
"""
129+
for task in self._tasks:
130+
task.cancel(msg)
131+
132+
async def stop(self, msg: str | None = None) -> None:
133+
"""Stop this background service.
134+
135+
This method cancels all running tasks spawned by this service and waits for them
136+
to finish.
137+
138+
Args:
139+
msg: The message to be passed to the tasks being cancelled.
140+
141+
Raises:
142+
BaseExceptionGroup: If any of the tasks spawned by this service raised an
143+
exception.
144+
"""
145+
if not self._tasks:
146+
return
147+
self.cancel(msg)
148+
await self.wait()
149+
150+
async def __aenter__(self) -> Self:
151+
"""Enter an async context.
152+
153+
Start this background service.
154+
155+
Returns:
156+
This background service.
157+
"""
158+
await self.start()
159+
return self
160+
161+
async def __aexit__(
162+
self,
163+
exc_type: type[BaseException] | None,
164+
exc_val: BaseException | None,
165+
exc_tb: TracebackType | None,
166+
) -> None:
167+
"""Exit an async context.
168+
169+
Stop this background service.
170+
171+
Args:
172+
exc_type: The type of the exception raised, if any.
173+
exc_val: The exception raised, if any.
174+
exc_tb: The traceback of the exception raised, if any.
175+
"""
176+
await self.stop()
177+
178+
async def wait(self) -> None:
179+
"""Wait this background service to finish.
180+
181+
Wait until all background service tasks are finished.
182+
183+
Raises:
184+
BaseExceptionGroup: If any of the tasks spawned by this service raised an
185+
exception (`CancelError` is not considered an error and not returned in
186+
the exception group).
187+
"""
188+
# We need to account for tasks that were created between when we started
189+
# awaiting and we finished awaiting.
190+
while self._tasks:
191+
done, pending = await asyncio.wait(self._tasks)
192+
assert not pending
193+
194+
# We remove the done tasks, but there might be new ones created after we
195+
# started waiting.
196+
self._tasks = self._tasks - done
197+
198+
exceptions: list[BaseException] = []
199+
for task in done:
200+
if task.cancelled():
201+
continue
202+
if exception := task.exception():
203+
exceptions.append(exception)
204+
if exceptions:
205+
raise BaseExceptionGroup(
206+
f"Error while stopping background service {self}", exceptions
207+
)
208+
209+
def __await__(self) -> collections.abc.Generator[None, None, None]:
210+
"""Await this background service.
211+
212+
An awaited background service will wait for all its tasks to finish.
213+
214+
Returns:
215+
An implementation-specific generator for the awaitable.
216+
"""
217+
return self.wait().__await__()
218+
219+
def __del__(self) -> None:
220+
"""Destroy this instance.
221+
222+
Cancel all running tasks spawned by this background service.
223+
"""
224+
self.cancel("{self!r} was deleted")
225+
226+
def __repr__(self) -> str:
227+
"""Return a string representation of this instance.
228+
229+
Returns:
230+
A string representation of this instance.
231+
"""
232+
return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"
233+
234+
def __str__(self) -> str:
235+
"""Return a string representation of this instance.
236+
237+
Returns:
238+
A string representation of this instance.
239+
"""
240+
return f"{type(self).__name__}[{self._name}]"

0 commit comments

Comments
 (0)