Skip to content

Commit a6bc362

Browse files
authored
Remove power distribution results from Report objects (#998)
Instead, this PR makes the power distribution results available directly from the `*Pool`s. This should go some way towards reducing the frequency of `Report` objects as reported in #818
2 parents cdede2e + f0128f0 commit a6bc362

22 files changed

+169
-73
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
* Passing a `request_timeout` in calls to `*_pool.propose_power` is no longer supported. It may be specified at application startup, through the new optional `api_power_request_timeout` parameter in the `microgrid.initialize()` method.
2121

22+
- Power distribution results are no longer available through the `power_status` streams in the `*Pool`s. They can now be accessed as a stream from a separate property `power_distribution_results`, which is available from all the `*Pool`s.
23+
2224
## New Features
2325

2426
- Classes `Bounds` and `SystemBounds` now implement the `__contains__` method, allowing the use of the `in` operator to check whether a value falls within the bounds or not.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ dependencies = [
3030
# changing the version
3131
# (plugins.mkdocstrings.handlers.python.import)
3232
"frequenz-client-microgrid >= 0.4.0, < 0.5.0",
33-
"frequenz-channels >= 1.0.0-rc1, < 2.0.0",
33+
"frequenz-channels >= 1.1.0, < 2.0.0",
3434
"networkx >= 2.8, < 4",
3535
"numpy >= 1.26.4, < 2",
3636
"typing_extensions >= 4.6.1, < 5",

src/frequenz/sdk/_internal/_channels.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ._asyncio import cancel_and_await
1313

1414
T_co = typing.TypeVar("T_co", covariant=True)
15+
U_co = typing.TypeVar("U_co", covariant=True)
1516

1617

1718
class ReceiverFetcher(typing.Generic[T_co], typing.Protocol):
@@ -29,6 +30,36 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
2930
"""
3031

3132

33+
class MappingReceiverFetcher(typing.Generic[T_co, U_co]):
34+
"""A receiver fetcher that can manipulate receivers before returning them."""
35+
36+
def __init__(
37+
self,
38+
fetcher: ReceiverFetcher[T_co],
39+
mapping_function: typing.Callable[[Receiver[T_co]], Receiver[U_co]],
40+
) -> None:
41+
"""Initialize this instance.
42+
43+
Args:
44+
fetcher: The underlying fetcher to get receivers from.
45+
mapping_function: The method to be applied on new receivers before returning
46+
them.
47+
"""
48+
self._fetcher = fetcher
49+
self._mapping_function = mapping_function
50+
51+
def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
52+
"""Get a receiver from the channel.
53+
54+
Args:
55+
limit: The maximum size of the receiver.
56+
57+
Returns:
58+
A receiver instance.
59+
"""
60+
return self._mapping_function(self._fetcher.new_receiver(limit=limit))
61+
62+
3263
class _Sentinel:
3364
"""A sentinel to denote that no value has been received yet."""
3465

src/frequenz/sdk/actor/_power_managing/_base_classes.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
if typing.TYPE_CHECKING:
1818
from ...timeseries._base_types import SystemBounds
19-
from .. import power_distributing
2019

2120

2221
@dataclasses.dataclass(frozen=True, kw_only=True)
@@ -52,12 +51,6 @@ class _Report:
5251
target_power: Power | None
5352
"""The currently set power for the components."""
5453

55-
distribution_result: power_distributing.Result | None
56-
"""The result of the last power distribution.
57-
58-
This is `None` if no power distribution has been performed yet.
59-
"""
60-
6154
_inclusion_bounds: timeseries.Bounds[Power] | None
6255
"""The available inclusion bounds for the components, for the actor's priority.
6356
@@ -266,15 +259,13 @@ def get_status(
266259
component_ids: frozenset[int],
267260
priority: int,
268261
system_bounds: SystemBounds,
269-
distribution_result: power_distributing.Result | None,
270262
) -> _Report:
271263
"""Get the bounds for a set of components, for the given priority.
272264
273265
Args:
274266
component_ids: The IDs of the components to get the bounds for.
275267
priority: The priority of the actor for which the bounds are requested.
276268
system_bounds: The system bounds for the components.
277-
distribution_result: The result of the last power distribution.
278269
279270
Returns:
280271
The bounds for the components.

src/frequenz/sdk/actor/_power_managing/_matryoshka.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
if typing.TYPE_CHECKING:
3434
from ...timeseries._base_types import SystemBounds
35-
from .. import power_distributing
3635

3736
_logger = logging.getLogger(__name__)
3837

@@ -225,15 +224,13 @@ def get_status(
225224
component_ids: frozenset[int],
226225
priority: int,
227226
system_bounds: SystemBounds,
228-
distribution_result: power_distributing.Result | None,
229227
) -> _Report:
230228
"""Get the bounds for the algorithm.
231229
232230
Args:
233231
component_ids: The IDs of the components to get the bounds for.
234232
priority: The priority of the actor for which the bounds are requested.
235233
system_bounds: The system bounds for the components.
236-
distribution_result: The result of the last power distribution.
237234
238235
Returns:
239236
The target power and the available bounds for the given components, for
@@ -245,7 +242,6 @@ def get_status(
245242
target_power=target_power,
246243
_inclusion_bounds=None,
247244
_exclusion_bounds=system_bounds.exclusion_bounds,
248-
distribution_result=distribution_result,
249245
)
250246

251247
lower_bound = system_bounds.inclusion_bounds.lower
@@ -284,7 +280,6 @@ def get_status(
284280
lower=lower_bound, upper=upper_bound
285281
),
286282
_exclusion_bounds=system_bounds.exclusion_bounds,
287-
distribution_result=distribution_result,
288283
)
289284

290285
@override

src/frequenz/sdk/actor/_power_managing/_power_managing_actor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ def __init__( # pylint: disable=too-many-arguments
9191
self._set_op_power_subscriptions: dict[
9292
frozenset[int], dict[int, Sender[_Report]]
9393
] = {}
94-
self._distribution_results: dict[frozenset[int], power_distributing.Result] = {}
9594

9695
self._set_power_group: BaseAlgorithm = Matryoshka(
9796
max_proposal_age=timedelta(seconds=60.0)
@@ -120,7 +119,6 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None:
120119
component_ids,
121120
priority,
122121
bounds,
123-
self._distribution_results.get(component_ids),
124122
)
125123
await sender.send(status)
126124
for priority, sender in self._set_power_subscriptions.get(
@@ -133,7 +131,6 @@ async def _send_reports(self, component_ids: frozenset[int]) -> None:
133131
bounds,
134132
self._set_op_power_group.get_target_power(component_ids),
135133
),
136-
self._distribution_results.get(component_ids),
137134
)
138135
await sender.send(status)
139136

@@ -403,9 +400,6 @@ async def _run(self) -> None:
403400
)
404401

405402
result = selected.message
406-
self._distribution_results[frozenset(result.request.component_ids)] = (
407-
result
408-
)
409403
if not isinstance(result, power_distributing.Success):
410404
_logger.warning(
411405
"PowerManagingActor: PowerDistributing failed: %s", result

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ def new_ev_charger_pool(
269269
power_manager_bounds_subs_sender=(
270270
self._ev_power_wrapper.bounds_subscription_channel.new_sender()
271271
),
272+
power_distribution_results_fetcher=(
273+
self._ev_power_wrapper.distribution_results_fetcher()
274+
),
272275
component_ids=component_ids,
273276
)
274277
)
@@ -343,6 +346,9 @@ def new_pv_pool(
343346
power_manager_bounds_subs_sender=(
344347
self._pv_power_wrapper.bounds_subscription_channel.new_sender()
345348
),
349+
power_distribution_results_fetcher=(
350+
self._pv_power_wrapper.distribution_results_fetcher()
351+
),
346352
component_ids=component_ids,
347353
)
348354

@@ -420,6 +426,9 @@ def new_battery_pool(
420426
power_manager_bounds_subscription_sender=(
421427
self._battery_power_wrapper.bounds_subscription_channel.new_sender()
422428
),
429+
power_distribution_results_fetcher=(
430+
self._battery_power_wrapper.distribution_results_fetcher()
431+
),
423432
min_update_interval=self._resampler_config.resampling_period,
424433
batteries_id=component_ids,
425434
)

src/frequenz/sdk/microgrid/_power_wrapper.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
# pylint: disable=cyclic-import
1717
from frequenz.client.microgrid import ComponentCategory, ComponentType
1818

19+
from .._internal._channels import ReceiverFetcher
20+
1921
# A number of imports had to be done inside functions where they are used, to break
2022
# import cycles.
2123
#
@@ -178,3 +180,7 @@ async def stop(self) -> None:
178180
await self._power_distributing_actor.stop()
179181
if self._power_managing_actor:
180182
await self._power_managing_actor.stop()
183+
184+
def distribution_results_fetcher(self) -> ReceiverFetcher[Result]:
185+
"""Return a fetcher for the power distribution results."""
186+
return self._power_distribution_results_channel

src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
from collections import abc
1414

1515
from ... import timeseries
16-
from ..._internal._channels import ReceiverFetcher
17-
from ...actor import _power_managing
16+
from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher
17+
from ...actor import _power_managing, power_distributing
1818
from ...timeseries import Energy, Percentage, Power, Sample, Temperature
1919
from .._base_types import SystemBounds
2020
from ..formula_engine import FormulaEngine
@@ -384,6 +384,21 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:
384384

385385
return channel
386386

387+
@property
388+
def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]:
389+
"""Get a receiver to receive power distribution results.
390+
391+
Returns:
392+
A receiver that will stream power distribution results for the pool's set of
393+
batteries.
394+
"""
395+
return MappingReceiverFetcher(
396+
self._pool_ref_store._power_dist_results_fetcher,
397+
lambda recv: recv.filter(
398+
lambda x: x.request.component_ids == self._pool_ref_store._batteries
399+
),
400+
)
401+
387402
@property
388403
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
389404
"""Get receiver to receive new power bounds when they change.

src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
from frequenz.client.microgrid import ComponentCategory
1515

1616
from ..._internal._asyncio import cancel_and_await
17+
from ..._internal._channels import ReceiverFetcher
1718
from ...actor._channel_registry import ChannelRegistry
1819
from ...actor._data_sourcing._component_metric_request import ComponentMetricRequest
1920
from ...actor._power_managing._base_classes import Proposal, ReportRequest
21+
from ...actor.power_distributing import Result
2022
from ...actor.power_distributing._component_status import ComponentPoolStatus
2123
from ...microgrid import connection_manager
2224
from ..formula_engine._formula_engine_pool import FormulaEnginePool
@@ -43,6 +45,7 @@ def __init__( # pylint: disable=too-many-arguments
4345
batteries_status_receiver: Receiver[ComponentPoolStatus],
4446
power_manager_requests_sender: Sender[Proposal],
4547
power_manager_bounds_subscription_sender: Sender[ReportRequest],
48+
power_distribution_results_fetcher: ReceiverFetcher[Result],
4649
min_update_interval: timedelta,
4750
batteries_id: Set[int] | None = None,
4851
) -> None:
@@ -63,6 +66,8 @@ def __init__( # pylint: disable=too-many-arguments
6366
requests to the power managing actor.
6467
power_manager_bounds_subscription_sender: A Channel sender for sending
6568
power bounds requests to the power managing actor.
69+
power_distribution_results_fetcher: A ReceiverFetcher for the results from
70+
the power distributing actor.
6671
min_update_interval: Some metrics in BatteryPool are send only when they
6772
change. For these metrics min_update_interval is the minimum time
6873
interval between the following messages.
@@ -105,6 +110,9 @@ def __init__( # pylint: disable=too-many-arguments
105110
self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}"
106111
self._power_distributing_namespace: str = f"power-distributor-{self._namespace}"
107112
self._channel_registry: ChannelRegistry = channel_registry
113+
self._power_dist_results_fetcher: ReceiverFetcher[Result] = (
114+
power_distribution_results_fetcher
115+
)
108116
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
109117
self._namespace,
110118
self._channel_registry,

0 commit comments

Comments
 (0)