Skip to content

Commit 0c2308d

Browse files
committed
Expose power distribution results directly from the battery pool
This would stream results only for the batteries that are part of the pool. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 00ab2bf commit 0c2308d

File tree

3 files changed

+28
-2
lines changed

3 files changed

+28
-2
lines changed

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,9 @@ def new_battery_pool(
420420
power_manager_bounds_subscription_sender=(
421421
self._battery_power_wrapper.bounds_subscription_channel.new_sender()
422422
),
423+
power_distribution_results_fetcher=(
424+
self._battery_power_wrapper.distribution_results_fetcher()
425+
),
423426
min_update_interval=self._resampler_config.resampling_period,
424427
batteries_id=component_ids,
425428
)

src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
from collections import abc
1414

1515
from ... import timeseries
16-
from ..._internal._channels import ReceiverFetcher
17-
from ...actor import _power_managing
16+
from ..._internal._channels import ReceiverFetcher, ReceiverFetcherWith
17+
from ...actor import _power_managing, power_distributing
1818
from ...timeseries import Energy, Percentage, Power, Sample, Temperature
1919
from .._base_types import SystemBounds
2020
from ..formula_engine import FormulaEngine
@@ -384,6 +384,21 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]:
384384

385385
return channel
386386

387+
@property
388+
def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]:
389+
"""Get a receiver to receive power distribution results.
390+
391+
Returns:
392+
A receiver that will stream power distribution results for the pool's set of
393+
batteries.
394+
"""
395+
return ReceiverFetcherWith(
396+
self._pool_ref_store._power_dist_results_fetcher,
397+
lambda recv: recv.filter(
398+
lambda x: x.request.component_ids == self._pool_ref_store._batteries
399+
),
400+
)
401+
387402
@property
388403
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:
389404
"""Get receiver to receive new power bounds when they change.

src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
from frequenz.client.microgrid import ComponentCategory
1515

1616
from ..._internal._asyncio import cancel_and_await
17+
from ..._internal._channels import ReceiverFetcher
1718
from ...actor._channel_registry import ChannelRegistry
1819
from ...actor._data_sourcing._component_metric_request import ComponentMetricRequest
1920
from ...actor._power_managing._base_classes import Proposal, ReportRequest
21+
from ...actor.power_distributing import Result
2022
from ...actor.power_distributing._component_status import ComponentPoolStatus
2123
from ...microgrid import connection_manager
2224
from ..formula_engine._formula_engine_pool import FormulaEnginePool
@@ -43,6 +45,7 @@ def __init__( # pylint: disable=too-many-arguments
4345
batteries_status_receiver: Receiver[ComponentPoolStatus],
4446
power_manager_requests_sender: Sender[Proposal],
4547
power_manager_bounds_subscription_sender: Sender[ReportRequest],
48+
power_distribution_results_fetcher: ReceiverFetcher[Result],
4649
min_update_interval: timedelta,
4750
batteries_id: Set[int] | None = None,
4851
) -> None:
@@ -63,6 +66,8 @@ def __init__( # pylint: disable=too-many-arguments
6366
requests to the power managing actor.
6467
power_manager_bounds_subscription_sender: A Channel sender for sending
6568
power bounds requests to the power managing actor.
69+
power_distribution_results_fetcher: A ReceiverFetcher for the results from
70+
the power distributing actor.
6671
min_update_interval: Some metrics in BatteryPool are send only when they
6772
change. For these metrics min_update_interval is the minimum time
6873
interval between the following messages.
@@ -105,6 +110,9 @@ def __init__( # pylint: disable=too-many-arguments
105110
self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}"
106111
self._power_distributing_namespace: str = f"power-distributor-{self._namespace}"
107112
self._channel_registry: ChannelRegistry = channel_registry
113+
self._power_dist_results_fetcher: ReceiverFetcher[Result] = (
114+
power_distribution_results_fetcher
115+
)
108116
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
109117
self._namespace,
110118
self._channel_registry,

0 commit comments

Comments
 (0)