Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@ name = "frequenz-core"
description = "Core utilities to complement Python's standard library"
readme = "README.md"
license = { text = "MIT" }
keywords = ["frequenz", "python", "lib", "library", "core", "stdlib"]
keywords = [
"asyncio",
"collections",
"core",
"datetime",
"frequenz",
"lib",
"library",
"math",
"python",
"stdlib",
"typing",
]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
Expand Down Expand Up @@ -157,7 +169,7 @@ packages = ["frequenz.core"]
strict = true

[[tool.mypy.overrides]]
module = ["mkdocs_macros.*", "sybil", "sybil.*"]
module = ["mkdocs_macros.*", "sybil", "sybil.*", "async_solipsism"]
ignore_missing_imports = true

[tool.setuptools_scm]
Expand Down
23 changes: 1 addition & 22 deletions src/frequenz/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,4 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Core utilities to complement Python's standard library.

TODO(cookiecutter): Add a more descriptive module description.
"""


# TODO(cookiecutter): Remove this function
def delete_me(*, blow_up: bool = False) -> bool:
"""Do stuff for demonstration purposes.

Args:
blow_up: If True, raise an exception.

Returns:
True if no exception was raised.

Raises:
RuntimeError: if blow_up is True.
"""
if blow_up:
raise RuntimeError("This function should be removed!")
return True
"""Core utilities to complement Python's standard library."""
282 changes: 282 additions & 0 deletions src/frequenz/core/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""General purpose async tools.

This module provides general purpose async tools that can be used to simplify the
development of asyncio-based applications.

The module provides the following classes and functions:

- [cancel_and_await][frequenz.core.asyncio.cancel_and_await]: A function that cancels a
task and waits for it to finish, handling `CancelledError` exceptions.
- [BackgroundService][frequenz.core.asyncio.BackgroundService]: A base class for
implementing background services that can be started and stopped.
"""


import abc
import asyncio
import collections.abc
from types import TracebackType
from typing import Any, Self


async def cancel_and_await(task: asyncio.Task[Any]) -> None:
"""Cancel a task and wait for it to finish.

Exits immediately if the task is already done.

The `CancelledError` is suppressed, but any other exception will be propagated.

Args:
task: The task to be cancelled and waited for.
"""
if task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass


class BackgroundService(abc.ABC):
"""A background service that can be started and stopped.

A background service is a service that runs in the background spawning one or more
tasks. The service can be [started][frequenz.core.asyncio.BackgroundService.start]
and [stopped][frequenz.core.asyncio.BackgroundService.stop] and can work as an
async context manager to provide deterministic cleanup.

To implement a background service, subclasses must implement the
[`start()`][frequenz.core.asyncio.BackgroundService.start] method, which should
start the background tasks needed by the service, and add them to the `_tasks`
protected attribute.

If you need to collect results or handle exceptions of the tasks when stopping the
service, then you need to also override the
[`stop()`][frequenz.core.asyncio.BackgroundService.stop] method, as the base
implementation does not collect any results and re-raises all exceptions.

!!! warning

As background services manage [`asyncio.Task`][] objects, a reference to them
must be held for as long as the background service is expected to be running,
otherwise its tasks will be cancelled and the service will stop. For more
information, please refer to the [Python `asyncio`
documentation](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task).

Example:
```python
import datetime
import asyncio

class Clock(BackgroundService):
def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
super().__init__(name=name)
self._resolution_s = resolution_s

def start(self) -> None:
self._tasks.add(asyncio.create_task(self._tick()))

async def _tick(self) -> None:
while True:
await asyncio.sleep(self._resolution_s)
print(datetime.datetime.now())

async def main() -> None:
# As an async context manager
async with Clock(resolution_s=1):
await asyncio.sleep(5)

# Manual start/stop (only use if necessary, as cleanup is more complicated)
clock = Clock(resolution_s=1)
clock.start()
await asyncio.sleep(5)
await clock.stop()

asyncio.run(main())
```
"""

def __init__(self, *, name: str | None = None) -> None:
"""Initialize this BackgroundService.

Args:
name: The name of this background service. If `None`, `str(id(self))` will
be used. This is used mostly for debugging purposes.
"""
self._name: str = str(id(self)) if name is None else name
self._tasks: set[asyncio.Task[Any]] = set()

@abc.abstractmethod
def start(self) -> None:
"""Start this background service."""

@property
def name(self) -> str:
"""The name of this background service.

Returns:
The name of this background service.
"""
return self._name

@property
def tasks(self) -> collections.abc.Set[asyncio.Task[Any]]:
"""Return the set of running tasks spawned by this background service.

Users typically should not modify the tasks in the returned set and only use
them for informational purposes.

!!! danger

Changing the returned tasks may lead to unexpected behavior, don't do it
unless the class explicitly documents it is safe to do so.

Returns:
The set of running tasks spawned by this background service.
"""
return self._tasks

@property
def is_running(self) -> bool:
"""Return whether this background service is running.

A service is considered running when at least one task is running.

Returns:
Whether this background service is running.
"""
return any(not task.done() for task in self._tasks)

def cancel(self, msg: str | None = None) -> None:
"""Cancel all running tasks spawned by this background service.

Args:
msg: The message to be passed to the tasks being cancelled.
"""
for task in self._tasks:
task.cancel(msg)

async def stop(self, msg: str | None = None) -> None:
"""Stop this background service.

This method cancels all running tasks spawned by this service and waits for them
to finish.

Args:
msg: The message to be passed to the tasks being cancelled.

Raises:
BaseExceptionGroup: If any of the tasks spawned by this service raised an
exception.
"""
if not self._tasks:
return
self.cancel(msg)
try:
await self.wait()
except BaseExceptionGroup as exc_group:
# We want to ignore CancelledError here as we explicitly cancelled all the
# tasks.
_, rest = exc_group.split(asyncio.CancelledError)
if rest is not None:
# We are filtering out from an exception group, we really don't want to
# add the exceptions we just filtered by adding a from clause here.
raise rest # pylint: disable=raise-missing-from

async def __aenter__(self) -> Self:
"""Enter an async context.

Start this background service.

Returns:
This background service.
"""
self.start()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit an async context.

Stop this background service.

Args:
exc_type: The type of the exception raised, if any.
exc_val: The exception raised, if any.
exc_tb: The traceback of the exception raised, if any.
"""
await self.stop()

async def wait(self) -> None:
"""Wait this background service to finish.

Wait until all background service tasks are finished.

Raises:
BaseExceptionGroup: If any of the tasks spawned by this service raised an
exception (`CancelError` is not considered an error and not returned in
the exception group).
"""
# We need to account for tasks that were created between when we started
# awaiting and we finished awaiting.
while self._tasks:
done, pending = await asyncio.wait(self._tasks)
assert not pending

# We remove the done tasks, but there might be new ones created after we
# started waiting.
self._tasks = self._tasks - done

exceptions: list[BaseException] = []
for task in done:
try:
# This will raise a CancelledError if the task was cancelled or any
# other exception if the task raised one.
_ = task.result()
except BaseException as error: # pylint: disable=broad-except
exceptions.append(error)
if exceptions:
raise BaseExceptionGroup(
f"Error while stopping background service {self}", exceptions
)

def __await__(self) -> collections.abc.Generator[None, None, None]:
"""Await this background service.

An awaited background service will wait for all its tasks to finish.

Returns:
An implementation-specific generator for the awaitable.
"""
return self.wait().__await__()

def __del__(self) -> None:
"""Destroy this instance.

Cancel all running tasks spawned by this background service.
"""
self.cancel("{self!r} was deleted")

def __repr__(self) -> str:
"""Return a string representation of this instance.

Returns:
A string representation of this instance.
"""
return f"{type(self).__name__}(name={self._name!r}, tasks={self._tasks!r})"

def __str__(self) -> str:
"""Return a string representation of this instance.

Returns:
A string representation of this instance.
"""
return f"{type(self).__name__}[{self._name}]"
9 changes: 9 additions & 0 deletions src/frequenz/core/datetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Timeseries basic types."""

from datetime import datetime, timezone

UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
"""The UNIX epoch (in UTC)."""
Loading