Skip to content

Commit 4fb5a88

Browse files
authored
Cleanup the actor package (#1031)
The `frequenz.sdk.actor` package was shared by the `Actor` implementation and by a number of core actors of the SDK and other utilities. This PR moves all the core actors and utilities into more appropriate locations. - `frequenz.sdk.` + `actor.ChannelRegistry` → `_internal._channels.ChannelRegistry` + `actor.ConfigManagingActor` → `config.ConfigManagingActor` + `actor._power_managing` → `microgrid._power_managing` + `actor.power_distributing` → `microgrid._power_distributing` + `actor._resampling` → `microgrid._resampling` + `actor._data_sourcing` → `microgrid._data_sourcing` This is part of #575
2 parents 0592777 + 698f879 commit 4fb5a88

File tree

84 files changed

+359
-386
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+359
-386
lines changed

RELEASE_NOTES.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,21 @@
2323

2424
- The `ConfigManagingActor` now uses `collections.abc.Mapping` as the output sender type. This change indicates that the broadcasted configuration is intended to be read-only.
2525

26+
- The `ConfigManagingActor` has moved from `frequenz.sdk.actor` to `frequenz.sdk.config`.
27+
28+
- The following core actors are no longer part of the public API:
29+
- `PowerDistributingActor`
30+
- `ComponentMetricsResamplingActor`
31+
- `DataSourcingActor`
32+
33+
- The following two types which are used for communicating with the data sourcing and resampling actors are also no longer part of the public API:
34+
- `ComponentMetricId`
35+
- `ComponentMetricRequest`
36+
37+
- The `ChannelRegistry` is no longer part of the public API.
38+
39+
- The `Result` types for the power distribution results are now exposed through the `frequenz.sdk.microgrid.battery_pool` module.
40+
2641
## New Features
2742

2843
- Classes `Bounds` and `SystemBounds` now implement the `__contains__` method, allowing the use of the `in` operator to check whether a value falls within the bounds or not.

benchmarks/power_distribution/power_distributor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
from frequenz.sdk import microgrid
1818
from frequenz.sdk.actor import ResamplerConfig
19-
from frequenz.sdk.actor.power_distributing import (
19+
from frequenz.sdk.microgrid import connection_manager
20+
from frequenz.sdk.microgrid._power_distributing import (
2021
ComponentPoolStatus,
2122
Error,
2223
OutOfBounds,
@@ -26,7 +27,6 @@
2627
Result,
2728
Success,
2829
)
29-
from frequenz.sdk.microgrid import connection_manager
3030
from frequenz.sdk.timeseries._quantities import Power
3131

3232
HOST = "microgrid.sandbox.api.frequenz.io"

benchmarks/timeseries/benchmark_datasourcing.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
from frequenz.client.microgrid import ComponentMetricId
2121

2222
from frequenz.sdk import microgrid
23-
from frequenz.sdk.actor import (
24-
ChannelRegistry,
23+
from frequenz.sdk._internal._channels import ChannelRegistry
24+
from frequenz.sdk.microgrid._data_sourcing import (
2525
ComponentMetricRequest,
2626
DataSourcingActor,
2727
)

src/frequenz/sdk/_internal/_channels.py

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,18 @@
44
"""General purpose classes for use with channels."""
55

66
import abc
7+
import dataclasses
8+
import logging
9+
import traceback
710
import typing
811

9-
from frequenz.channels import Receiver
12+
from frequenz.channels import Broadcast, Receiver
13+
14+
_logger = logging.getLogger(__name__)
1015

1116
T_co = typing.TypeVar("T_co", covariant=True)
1217
U_co = typing.TypeVar("U_co", covariant=True)
18+
T = typing.TypeVar("T")
1319

1420

1521
class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
@@ -55,3 +61,141 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
5561
A receiver instance.
5662
"""
5763
return self._mapping_function(self._fetcher.new_receiver(limit=limit))
64+
65+
66+
class ChannelRegistry:
67+
"""Dynamically creates, own and provide access to broadcast channels.
68+
69+
It can be used by actors to dynamically establish a communication channel
70+
between each other.
71+
72+
The registry is responsible for creating channels when they are first requested via
73+
the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method.
74+
75+
The registry also stores type information to make sure that the same channel is not
76+
used for different message types.
77+
78+
Since the registry owns the channels, it is also responsible for closing them when
79+
they are no longer needed. There is no way to remove a channel without closing it.
80+
81+
Note:
82+
This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels.
83+
"""
84+
85+
def __init__(self, *, name: str) -> None:
86+
"""Initialize this registry.
87+
88+
Args:
89+
name: A name to identify the registry in the logs. This name is also used as
90+
a prefix for the channel names.
91+
"""
92+
self._name = name
93+
self._channels: dict[str, _Entry] = {}
94+
95+
@property
96+
def name(self) -> str:
97+
"""The name of this registry."""
98+
return self._name
99+
100+
def message_type(self, key: str) -> type:
101+
"""Get the message type of the channel for the given key.
102+
103+
Args:
104+
key: The key to identify the channel.
105+
106+
Returns:
107+
The message type of the channel.
108+
109+
Raises:
110+
KeyError: If the channel does not exist.
111+
"""
112+
entry = self._channels.get(key)
113+
if entry is None:
114+
raise KeyError(f"No channel for key {key!r} exists.")
115+
return entry.message_type
116+
117+
def __contains__(self, key: str) -> bool:
118+
"""Check whether the channel for the given `key` exists."""
119+
return key in self._channels
120+
121+
def get_or_create(self, message_type: type[T], key: str) -> Broadcast[T]:
122+
"""Get or create a channel for the given key.
123+
124+
If a channel for the given key already exists, the message type of the existing
125+
channel is checked against the requested message type. If they do not match,
126+
a `ValueError` is raised.
127+
128+
Note:
129+
The types have to match exactly, it doesn't do a subtype check due to
130+
technical limitations. In the future subtype checks might be supported.
131+
132+
Args:
133+
message_type: The type of the message that is sent through the channel.
134+
key: The key to identify the channel.
135+
136+
Returns:
137+
The channel for the given key.
138+
139+
Raises:
140+
ValueError: If the channel exists and the message type does not match.
141+
"""
142+
if key not in self._channels:
143+
if _logger.isEnabledFor(logging.DEBUG):
144+
_logger.debug(
145+
"Creating a new channel for key %r with type %s at:\n%s",
146+
key,
147+
message_type,
148+
"".join(traceback.format_stack(limit=10)[:9]),
149+
)
150+
self._channels[key] = _Entry(
151+
message_type, Broadcast(name=f"{self._name}-{key}")
152+
)
153+
154+
entry = self._channels[key]
155+
if entry.message_type is not message_type:
156+
exception = ValueError(
157+
f"Type mismatch, a channel for key {key!r} exists and the requested "
158+
f"message type {message_type} is not the same as the existing "
159+
f"message type {entry.message_type}."
160+
)
161+
if _logger.isEnabledFor(logging.DEBUG):
162+
_logger.debug(
163+
"%s at:\n%s",
164+
str(exception),
165+
# We skip the last frame because it's this method, and limit the
166+
# stack to 9 frames to avoid adding too much noise.
167+
"".join(traceback.format_stack(limit=10)[:9]),
168+
)
169+
raise exception
170+
171+
return typing.cast(Broadcast[T], entry.channel)
172+
173+
async def close_and_remove(self, key: str) -> None:
174+
"""Remove the channel for the given key.
175+
176+
Args:
177+
key: The key to identify the channel.
178+
179+
Raises:
180+
KeyError: If the channel does not exist.
181+
"""
182+
entry = self._channels.pop(key, None)
183+
if entry is None:
184+
raise KeyError(f"No channel for key {key!r} exists.")
185+
await entry.channel.close()
186+
187+
188+
@dataclasses.dataclass(frozen=True)
189+
class _Entry:
190+
"""An entry in a channel registry."""
191+
192+
message_type: type
193+
"""The type of the message that is sent through the channel in this entry."""
194+
195+
# We use object instead of Any to minimize the chances of hindering type checking.
196+
# If for some reason the channel is not casted to the proper underlaying type, when
197+
# using object at least accessing any member that's not part of the object base
198+
# class will yield a type error, while if we used Any, it would not and the issue
199+
# would be much harder to find.
200+
channel: Broadcast[object]
201+
"""The channel in this entry."""

src/frequenz/sdk/actor/__init__.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -599,21 +599,11 @@ async def main() -> None: # (6)!
599599
from ..timeseries._resampling import ResamplerConfig
600600
from ._actor import Actor
601601
from ._background_service import BackgroundService
602-
from ._channel_registry import ChannelRegistry
603-
from ._config_managing import ConfigManagingActor
604-
from ._data_sourcing import ComponentMetricId, ComponentMetricRequest, DataSourcingActor
605-
from ._resampling import ComponentMetricsResamplingActor
606602
from ._run_utils import run
607603

608604
__all__ = [
609605
"Actor",
610606
"BackgroundService",
611-
"ChannelRegistry",
612-
"ComponentMetricId",
613-
"ComponentMetricRequest",
614-
"ComponentMetricsResamplingActor",
615-
"ConfigManagingActor",
616-
"DataSourcingActor",
617607
"ResamplerConfig",
618608
"run",
619609
]

src/frequenz/sdk/actor/_channel_registry.py

Lines changed: 0 additions & 152 deletions
This file was deleted.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Read and update config variables."""
5+
6+
from ._config_managing import ConfigManagingActor
7+
8+
__all__ = ["ConfigManagingActor"]
File renamed without changes.

0 commit comments

Comments
 (0)