Skip to content

Commit df383ee

Browse files
Add BatteryPool and PowerDistributingActor to the DataPipeline (#317)
2 parents fb3972d + a50550d commit df383ee

File tree

1 file changed

+84
-2
lines changed

1 file changed

+84
-2
lines changed

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111
from __future__ import annotations
1212

1313
import typing
14+
from collections import abc
1415
from dataclasses import dataclass
16+
from datetime import timedelta
1517

16-
from frequenz.channels import Broadcast, Sender
18+
from frequenz.channels import Bidirectional, Broadcast, Sender
1719

1820
# A number of imports had to be done inside functions where they are used, to break
1921
# import cycles.
@@ -26,9 +28,17 @@
2628
DataSourcingActor,
2729
ResamplerConfig,
2830
)
31+
from ..actor.power_distributing import (
32+
BatteryStatus,
33+
PowerDistributingActor,
34+
Request,
35+
Result,
36+
)
37+
from ..timeseries.battery_pool import BatteryPool
2938
from ..timeseries.ev_charger_pool import EVChargerPool
3039
from ..timeseries.logical_meter import LogicalMeter
3140

41+
3242
_REQUEST_RECV_BUFFER_SIZE = 500
3343
"""The maximum number of requests that can be queued in the request receiver.
3444
@@ -70,8 +80,17 @@ def __init__(
7080

7181
self._data_sourcing_actor: _ActorInfo | None = None
7282
self._resampling_actor: _ActorInfo | None = None
83+
84+
self._battery_status_channel = Broadcast["BatteryStatus"]("battery-status")
85+
self._power_distribution_channel = Bidirectional["Request", "Result"](
86+
"Default", "Power Distributing Actor"
87+
)
88+
89+
self._power_distributing_actor: "PowerDistributingActor" | None = None
90+
7391
self._logical_meter: "LogicalMeter" | None = None
7492
self._ev_charger_pools: dict[frozenset[int], "EVChargerPool"] = {}
93+
self._battery_pools: dict[frozenset[int], "BatteryPool"] = {}
7594

7695
def logical_meter(self) -> LogicalMeter:
7796
"""Return the logical meter instance.
@@ -102,7 +121,7 @@ def ev_charger_pool(
102121
created and returned.
103122
104123
Args:
105-
ev_charger_ids: Optional set of IDs of EV Charger to be managed by the
124+
ev_charger_ids: Optional set of IDs of EV Chargers to be managed by the
106125
EVChargerPool.
107126
108127
Returns:
@@ -123,6 +142,69 @@ def ev_charger_pool(
123142
)
124143
return self._ev_charger_pools[key]
125144

145+
def battery_pool(
146+
self,
147+
battery_ids: abc.Set[int] | None = None,
148+
) -> BatteryPool:
149+
"""Return the corresponding BatteryPool instance for the given ids.
150+
151+
If a BatteryPool instance for the given ids doesn't exist, a new one is created
152+
and returned.
153+
154+
Args:
155+
battery_ids: Optional set of IDs of batteries to be managed by the
156+
BatteryPool.
157+
158+
Returns:
159+
A BatteryPool instance.
160+
"""
161+
from ..timeseries.battery_pool import BatteryPool
162+
163+
if not self._power_distributing_actor:
164+
self._start_power_distributing_actor()
165+
166+
# We use frozenset to make a hashable key from the input set.
167+
key: frozenset[int] = frozenset()
168+
if battery_ids is not None:
169+
key = frozenset(battery_ids)
170+
171+
if key not in self._battery_pools:
172+
self._battery_pools[key] = BatteryPool(
173+
batteries_status_receiver=self._battery_status_channel.new_receiver(),
174+
min_update_interval=timedelta(
175+
seconds=self._resampler_config.resampling_period_s
176+
),
177+
batteries_id=battery_ids,
178+
)
179+
180+
return self._battery_pools[key]
181+
182+
def power_distributing_handle(self) -> Bidirectional.Handle[Request, Result]:
183+
"""Return the handle to the power distributing actor.
184+
185+
Returns:
186+
A Bidirectional handle to communicate with the power distributing actor.
187+
"""
188+
if not self._power_distributing_actor:
189+
self._start_power_distributing_actor()
190+
191+
return self._power_distribution_channel.client_handle
192+
193+
def _start_power_distributing_actor(self) -> None:
194+
"""Start the power distributing actor if it is not already running."""
195+
if self._power_distributing_actor:
196+
return
197+
198+
from ..actor.power_distributing import PowerDistributingActor
199+
200+
# The PowerDistributingActor is started with only a single default user channel.
201+
# Until the PowerManager is implemented, support for multiple use-case actors
202+
# will not be available in the high level interface.
203+
self._power_distributing_actor = PowerDistributingActor(
204+
users_channels={"default": self._power_distribution_channel.service_handle},
205+
battery_status_sender=self._battery_status_channel.new_sender(),
206+
)
207+
126208
def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
127209
"""Return a Sender for sending requests to the data sourcing actor.
128210

0 commit comments

Comments
 (0)