Skip to content

Commit 8acfc11

Browse files
authored
Implement the PowerManagingActor (#692)
This PR add a PowerManagingActor implementation with the Matryoshka algorithm, that uses actor priorities to resolve conflicting power requests when multiple actors are trying to use the same batteries. The Matryoshka algorithm requires actors to specify a desired power value, and optionally, bounds to limit the range of power values available for lower priority actors. Some of the other changes in this PR are: - updates the `microgrid.battery_pool()` method to accept a priority value, and return a new battery pool instance with each call, while using a shared reference store class internally, that helps reuse formulas, metric aggregators and battery status tracking tasks. - update BatteryPool's control methods to send requests to the PowerManagingActor rather than the PowerDistributingActor. - Rename the control methods in the BatteryPool from `set_power/charge/discharge` -> `propose_{power/charge/discharge}`, and update them to accept optional bounds to apply on lower priority actors. The new methods also don't accept a `adjust_power` flag, because the `PowerManagingActor` will always adjust power to fit within the available bounds. - Rename `BatteryPool.power_bounds` to a private method called `_system_power_bounds`, because it will be used only by the PowerManager as the top level bounds. - Add `BatteryPool.power_status`, which streams - bounds adjusted to an actors priority, - the latest target power for a set of batteries, - the result from the power distributor for the last call. - Update the PowerDistributingActor to have a single input and output channel, because the PowerManagingActor will be the only component that uses the PowerDistributingActor from now on. Closes #161
2 parents 41cc513 + 93adcfb commit 8acfc11

31 files changed

+2584
-786
lines changed

RELEASE_NOTES.md

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,21 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
- `microgrid.battery_pool()` method now accepts a priority value.
10+
11+
- `BatteryPool`'s control methods
12+
13+
* Original methods `{set_power/charge/discharge}` are now replaced by `propose_{power/charge/discharge}`
14+
* The `propose_*` methods send power proposals to the `PowerManagingActor`, where it can be overridden by proposals from other actors.
15+
* They no longer have the `adjust_power` flag, because the `PowerManagingActor` will always adjust power to fit within the available bounds.
16+
17+
- `BatteryPool`'s reporting methods
18+
19+
* `power_bounds` is replaced by `power_status`
20+
* The `power_status` method streams objects containing:
21+
+ bounds adjusted to the actor's priorities
22+
+ the latest target power for the set of batteries
23+
+ the results from the power distributor for the last request
1024

1125
## New Features
1226

@@ -31,12 +45,11 @@
3145
- Add `fill_value` option to window method to impute missing values. By default missing values are imputed with `NaN`.
3246
- Add `at` method to `MovingWindow` to access a single element and use it in `__getitem__` magic to fully support single element access.
3347

34-
35-
3648
- The PowerDistributingActor now supports n:m relations between inverters and
3749
batteries.
3850
This means that one or more inverters can be connected to one or more batteries.
3951

52+
- A `PowerManagingActor` implementation
4053

4154
## Bug Fixes
4255

benchmarks/power_distribution/power_distributor.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from frequenz.channels import Broadcast
1515

1616
from frequenz.sdk import microgrid
17-
from frequenz.sdk.actor import ChannelRegistry, ResamplerConfig
17+
from frequenz.sdk.actor import ResamplerConfig
1818
from frequenz.sdk.actor.power_distributing import (
1919
BatteryStatus,
2020
Error,
@@ -33,6 +33,10 @@
3333
PORT = 61060
3434

3535

36+
# TODO: this send_requests function uses the battery pool to # pylint: disable=fixme
37+
# send requests, and those no longer go directly to the power distributing actor, but
38+
# instead through the power managing actor. So the below function needs to be updated
39+
# to use the PowerDistributingActor directly.
3640
async def send_requests(batteries: set[int], request_num: int) -> list[Result]:
3741
"""Send requests to the PowerDistributingActor and wait for the response.
3842
@@ -47,10 +51,12 @@ async def send_requests(batteries: set[int], request_num: int) -> list[Result]:
4751
List of the results from the PowerDistributingActor.
4852
"""
4953
battery_pool = microgrid.battery_pool(batteries)
50-
results_rx = battery_pool.power_distribution_results()
51-
result: list[Result] = []
54+
results_rx = battery_pool.power_status.new_receiver()
55+
result: list[Any] = []
5256
for _ in range(request_num):
53-
await battery_pool.set_power(Power(float(random.randrange(100000, 1000000))))
57+
await battery_pool.propose_power(
58+
Power(float(random.randrange(100000, 1000000)))
59+
)
5460
try:
5561
output = await asyncio.wait_for(results_rx.receive(), timeout=3)
5662
if output is None:
@@ -107,10 +113,10 @@ async def run_test( # pylint: disable=too-many-locals
107113

108114
power_request_channel = Broadcast[Request]("power-request")
109115
battery_status_channel = Broadcast[BatteryStatus]("battery-status")
110-
channel_registry = ChannelRegistry(name="power_distributor")
116+
power_result_channel = Broadcast[Result]("power-result")
111117
async with PowerDistributingActor(
112-
channel_registry=channel_registry,
113118
requests_receiver=power_request_channel.new_receiver(),
119+
results_sender=power_result_channel.new_sender(),
114120
battery_status_sender=battery_status_channel.new_sender(),
115121
):
116122
tasks: list[Coroutine[Any, Any, list[Result]]] = []

examples/battery_pool.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ async def main() -> None:
3434
receivers: dict[str, Receiver[Any]] = {
3535
"soc": battery_pool.soc.new_receiver(maxsize=1),
3636
"capacity": battery_pool.capacity.new_receiver(maxsize=1),
37-
"power_bounds": battery_pool.power_bounds.new_receiver(maxsize=1),
37+
# pylint: disable=protected-access
38+
"power_bounds": battery_pool._system_power_bounds.new_receiver(maxsize=1),
39+
# pylint: enable=protected-access
3840
}
3941

4042
merged_channel = MergeNamed[Any](**receivers)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""General purpose classes for use with channels."""
5+
6+
import abc
7+
import typing
8+
9+
from frequenz.channels import Receiver
10+
11+
T = typing.TypeVar("T")
12+
13+
14+
class ReceiverFetcher(typing.Generic[T], typing.Protocol):
15+
"""An interface that just exposes a `new_receiver` method."""
16+
17+
@abc.abstractmethod
18+
def new_receiver(self, maxsize: int = 50) -> Receiver[T]:
19+
"""Get a receiver from the channel.
20+
21+
Args:
22+
maxsize: The maximum size of the receiver.
23+
24+
Returns:
25+
A receiver instance.
26+
"""

src/frequenz/sdk/actor/_channel_registry.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33

44
"""A class that would dynamically create, own and provide access to channels."""
55

6-
from typing import Any
6+
from __future__ import annotations
7+
8+
import typing
79

810
from frequenz.channels import Broadcast, Receiver, Sender
911

12+
from .._internal._channels import ReceiverFetcher
13+
1014

1115
class ChannelRegistry:
1216
"""Dynamically creates, own and provide access to channels.
@@ -22,9 +26,9 @@ def __init__(self, *, name: str) -> None:
2226
name: A unique name for the registry.
2327
"""
2428
self._name = name
25-
self._channels: dict[str, Broadcast[Any]] = {}
29+
self._channels: dict[str, Broadcast[typing.Any]] = {}
2630

27-
def new_sender(self, key: str) -> Sender[Any]:
31+
def new_sender(self, key: str) -> Sender[typing.Any]:
2832
"""Get a sender to a dynamically created channel with the given key.
2933
3034
Args:
@@ -37,18 +41,32 @@ def new_sender(self, key: str) -> Sender[Any]:
3741
self._channels[key] = Broadcast(f"{self._name}-{key}")
3842
return self._channels[key].new_sender()
3943

40-
def new_receiver(self, key: str) -> Receiver[Any]:
44+
def new_receiver(self, key: str, maxsize: int = 50) -> Receiver[typing.Any]:
4145
"""Get a receiver to a dynamically created channel with the given key.
4246
4347
Args:
4448
key: A key to identify the channel.
49+
maxsize: The maximum size of the receiver.
4550
4651
Returns:
4752
A receiver for a dynamically created channel with the given key.
4853
"""
4954
if key not in self._channels:
5055
self._channels[key] = Broadcast(f"{self._name}-{key}")
51-
return self._channels[key].new_receiver()
56+
return self._channels[key].new_receiver(maxsize=maxsize)
57+
58+
def new_receiver_fetcher(self, key: str) -> ReceiverFetcher[typing.Any]:
59+
"""Get a receiver fetcher to a dynamically created channel with the given key.
60+
61+
Args:
62+
key: A key to identify the channel.
63+
64+
Returns:
65+
A receiver fetcher for a dynamically created channel with the given key.
66+
"""
67+
if key not in self._channels:
68+
self._channels[key] = Broadcast(f"{self._name}-{key}")
69+
return _RegistryReceiverFetcher(self, key)
5270

5371
async def _close_channel(self, key: str) -> None:
5472
"""Close a channel with the given key.
@@ -61,3 +79,35 @@ async def _close_channel(self, key: str) -> None:
6179
if key in self._channels:
6280
if channel := self._channels.pop(key, None):
6381
await channel.close()
82+
83+
84+
T = typing.TypeVar("T")
85+
86+
87+
class _RegistryReceiverFetcher(typing.Generic[T]):
88+
"""A receiver fetcher that is bound to a channel registry and a key."""
89+
90+
def __init__(
91+
self,
92+
registry: ChannelRegistry,
93+
key: str,
94+
) -> None:
95+
"""Create a new instance of a receiver fetcher.
96+
97+
Args:
98+
registry: The channel registry.
99+
key: A key to identify the channel.
100+
"""
101+
self._registry = registry
102+
self._key = key
103+
104+
def new_receiver(self, maxsize: int = 50) -> Receiver[T]:
105+
"""Get a receiver from the channel.
106+
107+
Args:
108+
maxsize: The maximum size of the receiver.
109+
110+
Returns:
111+
A receiver instance.
112+
"""
113+
return self._registry.new_receiver(self._key, maxsize)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A power manager implementation."""
5+
6+
from ._base_classes import Algorithm, Proposal, Report, ReportRequest
7+
from ._power_managing_actor import PowerManagingActor
8+
9+
__all__ = [
10+
"Algorithm",
11+
"PowerManagingActor",
12+
"Proposal",
13+
"Report",
14+
"ReportRequest",
15+
]

0 commit comments

Comments
 (0)