|
12 | 12 |
|
13 | 13 | import typing |
14 | 14 | from dataclasses import dataclass |
| 15 | +from datetime import timedelta |
15 | 16 |
|
16 | | -from frequenz.channels import Broadcast, Sender |
| 17 | +from frequenz.channels import Bidirectional, Broadcast, Sender |
17 | 18 |
|
18 | 19 | # A number of imports had to be done inside functions where they are used, to break |
19 | 20 | # import cycles. |
|
26 | 27 | DataSourcingActor, |
27 | 28 | ResamplerConfig, |
28 | 29 | ) |
| 30 | + from ..actor.power_distributing import ( |
| 31 | + BatteryStatus, |
| 32 | + PowerDistributingActor, |
| 33 | + Request, |
| 34 | + Result, |
| 35 | + ) |
29 | 36 | from ..timeseries.ev_charger_pool import EVChargerPool |
30 | 37 | from ..timeseries.logical_meter import LogicalMeter |
31 | 38 |
|
@@ -70,6 +77,13 @@ def __init__( |
70 | 77 |
|
71 | 78 | self._data_sourcing_actor: _ActorInfo | None = None |
72 | 79 | self._resampling_actor: _ActorInfo | None = None |
| 80 | + |
| 81 | + self._power_distribution_channel = Bidirectional["Request", "Result"]( |
| 82 | + "Default", "Power Distributing Actor" |
| 83 | + ) |
| 84 | + |
| 85 | + self._power_distributing_actor: "PowerDistributingActor" | None = None |
| 86 | + |
73 | 87 | self._logical_meter: "LogicalMeter" | None = None |
74 | 88 | self._ev_charger_pools: dict[frozenset[int], "EVChargerPool"] = {} |
75 | 89 |
|
@@ -123,6 +137,32 @@ def ev_charger_pool( |
123 | 137 | ) |
124 | 138 | return self._ev_charger_pools[key] |
125 | 139 |
|
| 140 | + def power_distributing_handle(self) -> Bidirectional.Handle[Request, Result]: |
| 141 | + """Return the handle to the power distributing actor. |
| 142 | +
|
| 143 | + Returns: |
| 144 | + A Bidirectional handle to communicate with the power distributing actor. |
| 145 | + """ |
| 146 | + if not self._power_distributing_actor: |
| 147 | + self._start_power_distributing_actor() |
| 148 | + |
| 149 | + return self._power_distribution_channel.client_handle |
| 150 | + |
| 151 | + def _start_power_distributing_actor(self) -> None: |
| 152 | + """Start the power distributing actor if it is not already running.""" |
| 153 | + if self._power_distributing_actor: |
| 154 | + return |
| 155 | + |
| 156 | + from ..actor.power_distributing import PowerDistributingActor |
| 157 | + |
| 158 | + # The PowerDistributingActor is started with only a single default user channel. |
| 159 | + # Until the PowerManager is implemented, support for multiple use-case actors |
| 160 | + # will not be available in the high level interface. |
| 161 | + self._power_distributing_actor = PowerDistributingActor( |
| 162 | + users_channels={"default": self._power_distribution_channel.service_handle}, |
| 163 | + battery_status_sender=self._battery_status_channel.new_sender(), |
| 164 | + ) |
| 165 | + |
126 | 166 | def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]: |
127 | 167 | """Return a Sender for sending requests to the data sourcing actor. |
128 | 168 |
|
|
0 commit comments