Skip to content

Commit a7ed294

Browse files
committed
Inprogress
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 53b1e02 commit a7ed294

File tree

4 files changed

+115
-58
lines changed

4 files changed

+115
-58
lines changed

src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ class EVDistributionConfig:
2323
initial_current: Current = field(default_factory=lambda: Current.from_amperes(10.0))
2424
"""The initial current that can be allocated to an EV charger."""
2525

26-
increase_power_interval: timedelta = timedelta(minutes=1)
26+
increase_power_interval: timedelta = timedelta(seconds=30)
2727
"""The interval at which the power can be increased for an EV charger."""

src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py

Lines changed: 72 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
# License: MIT
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

4-
"""Manage _ev chargers for the power distributor."""
4+
"""Manage EV chargers for the power distributor."""
55

6+
import asyncio
67
import collections.abc
78
import logging
89
from datetime import datetime, timedelta, timezone
910

10-
from frequenz.channels import Sender
11-
from frequenz.channels.util import Merge
11+
from frequenz.channels import Broadcast, Sender
12+
from frequenz.channels.util import Merge, select, selected_from
1213
from typing_extensions import override
1314

1415
from frequenz.sdk import microgrid
@@ -58,6 +59,9 @@ def __init__(
5859
component_status_tracker_type=EVChargerStatusTracker,
5960
)
6061
self._target_power = Power.zero()
62+
self._target_power_channel = Broadcast[Power]("target_power")
63+
self._target_power_tx = self._target_power_channel.new_sender()
64+
self._task = None
6165

6266
@override
6367
def component_ids(self) -> collections.abc.Set[int]:
@@ -67,6 +71,7 @@ def component_ids(self) -> collections.abc.Set[int]:
6771
@override
6872
async def start(self) -> None:
6973
"""Start the ev charger data manager."""
74+
self._task = asyncio.create_task(self._run_forever())
7075

7176
@override
7277
async def distribute_power(self, request: Request) -> Result:
@@ -78,10 +83,9 @@ async def distribute_power(self, request: Request) -> Result:
7883
Returns:
7984
Result of the distribution.
8085
"""
81-
self._target_power = request.power
82-
used_power = self._evc_states.get_ev_total_used_power()
83-
if self._target_power < used_power:
84-
self._throttle_ev_chargers(used_power - self._target_power)
86+
await self._target_power_tx.send(request.power)
87+
88+
# TODO: check max_power check based on sum of bounds
8589
return Success(
8690
request,
8791
Power.zero(),
@@ -143,10 +147,11 @@ def _act_on_new_data(self, ev_data: EVChargerData) -> list[tuple[int, Power]]:
143147
if ev_previously_connected:
144148
_logger.info("EV disconnected from EV charger %s", component_id)
145149
self._evc_states.get(component_id).update_state(ev_data)
146-
return [(component_id, Power.zero())]
150+
if self._evc_states.get(component_id).last_allocation > Power.zero():
151+
return [(component_id, Power.zero())]
147152

148153
# else if last throttling was less than 'increase_power_interval', do nothing.
149-
now = datetime.utcnow()
154+
now = datetime.now(tz=timezone.utc)
150155
last_throttling_time = self._evc_states.get(component_id).last_reallocation_time
151156
if last_throttling_time is not None:
152157
dur = now - last_throttling_time
@@ -181,7 +186,6 @@ def _act_on_new_data(self, ev_data: EVChargerData) -> list[tuple[int, Power]]:
181186
evc.last_allocation + allottable_power,
182187
Power.from_watts(evc.last_data.active_power_inclusion_upper_bound),
183188
)
184-
evc.update_last_allocation(target_power, now)
185189
_logger.info(
186190
"Increasing power to EV charger %s from %s to %s",
187191
component_id,
@@ -190,38 +194,74 @@ def _act_on_new_data(self, ev_data: EVChargerData) -> list[tuple[int, Power]]:
190194
)
191195
return [(component_id, target_power)]
192196

197+
async def _run_forever(self) -> None:
198+
while True:
199+
try:
200+
await self._run()
201+
except: # pylint: disable=bare-except
202+
_logger.exception("Recovering from an error in EV charger manager.")
203+
await asyncio.sleep(1.0)
204+
193205
async def _run(self) -> None:
194206
api = microgrid.connection_manager.get().api_client
195207
ev_charger_data_rx = Merge(
196208
*[await api.ev_charger_data(evc_id) for evc_id in self._ev_charger_ids]
197209
)
198-
async for evc_data in ev_charger_data_rx:
210+
target_power_rx = self._target_power_channel.new_receiver()
211+
async for selected in select(ev_charger_data_rx, target_power_rx):
199212
bounds_changes = []
200-
# If a new ev charger is added, add it to the state tracker, with
201-
# now as the last reallocation time and last charging time.
202-
#
203-
# This means it won't be assigned any power until the reallocation duration
204-
# has passed.
205-
if evc_data.component_id not in self._evc_states:
206-
now = datetime.now(tz=timezone.utc)
207-
self._evc_states.add_evc(
208-
EvcState(
209-
component_id=evc_data.component_id,
210-
last_data=evc_data,
211-
power=Power.from_watts(evc_data.active_power),
212-
last_allocation=Power.zero(),
213-
last_reallocation_time=now,
214-
last_charging_time=now,
213+
if selected_from(selected, ev_charger_data_rx):
214+
evc_data = selected.value
215+
# If a new ev charger is added, add it to the state tracker, with
216+
# now as the last reallocation time and last charging time.
217+
#
218+
# This means it won't be assigned any power until the reallocation
219+
# duration has passed.
220+
if evc_data.component_id not in self._evc_states:
221+
now = datetime.now(tz=timezone.utc)
222+
self._evc_states.add_evc(
223+
EvcState(
224+
component_id=evc_data.component_id,
225+
last_data=evc_data,
226+
power=Power.zero(),
227+
last_allocation=Power.zero(),
228+
last_reallocation_time=now,
229+
last_charging_time=now,
230+
)
231+
)
232+
bounds_changes = [(evc_data.component_id, Power.zero())]
233+
234+
# See if the ev charger has room for more power, and if the last
235+
# allocation was not in the last reallocation duration.
236+
else:
237+
bounds_changes = self._act_on_new_data(evc_data)
238+
239+
elif selected_from(selected, target_power_rx):
240+
self._target_power = selected.value
241+
_logger.debug("New target power: %s", self._target_power)
242+
used_power = self._evc_states.get_ev_total_used_power()
243+
if self._target_power < used_power:
244+
bounds_changes = self._throttle_ev_chargers(
245+
used_power - self._target_power
215246
)
216-
)
217247

218-
# See if the ev charger has room for more power, and if the last allocation
219-
# was not in the last reallocation duration.
248+
if bounds_changes:
249+
_logger.debug("Setting power to EV chargers: %s", bounds_changes)
220250
else:
221-
bounds_changes = self._act_on_new_data(evc_data)
222-
251+
continue
252+
now = datetime.now(tz=timezone.utc)
223253
for component_id, power in bounds_changes:
224-
await api.set_power(component_id, power.as_watts())
254+
try:
255+
self._evc_states.get(component_id).update_last_allocation(
256+
power, now
257+
)
258+
await api.set_power(component_id, power.as_watts())
259+
except Exception: # pylint: disable=bare-except
260+
_logger.error(
261+
"Failed to set power to EV charger %s to %s",
262+
component_id,
263+
power,
264+
)
225265

226266
def _throttle_ev_chargers(self, throttle_by: Power) -> list[tuple[int, Power]]:
227267
"""Reduce EV charging power to meet the target power.

src/frequenz/sdk/actor/power_distributing/_component_status/_ev_charger_status_tracker.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def __init__( # pylint: disable=too-many-arguments
7272
@override
7373
def start(self) -> None:
7474
"""Start the status tracker."""
75-
self._tasks.add(asyncio.create_task(self._run()))
75+
self._tasks.add(asyncio.create_task(self._run_forever()))
7676

7777
def _is_working(self, ev_data: EVChargerData) -> bool:
7878
"""Return whether the given EV charger can be assigned power.
@@ -99,9 +99,9 @@ async def _run_forever(self) -> None:
9999
while True:
100100
try:
101101
await self._run()
102-
except Exception as ex: # pylint: disable=broad-except
102+
except: # pylint: disable=broad-except
103103
_logger.exception(
104-
"Restarting after exception in EVChargerStatusTracker: %s", ex
104+
"Restarting after exception in EVChargerStatusTracker"
105105
)
106106
await asyncio.sleep(1.0)
107107

@@ -155,12 +155,15 @@ async def _run(self) -> None:
155155
api_client = connection_manager.get().api_client
156156
ev_data_rx = await api_client.ev_charger_data(self._component_id)
157157
set_power_result_rx = self._set_power_result_receiver
158-
missing_data_timer = Timer(self._max_data_age, SkipMissedAndDrift())
158+
# TODO: Add missing data timer once resets are fixed in channels
159+
# missing_data_timer = Timer(self._max_data_age, SkipMissedAndDrift())
160+
missing_data_timer = Timer(timedelta(seconds=100.0), SkipMissedAndDrift())
159161
async for selected in select(
160162
ev_data_rx, set_power_result_rx, missing_data_timer
161163
):
162164
new_status = ComponentStatusEnum.NOT_WORKING
163165
if selected_from(selected, ev_data_rx):
166+
missing_data_timer.reset()
164167
new_status = self._handle_ev_data(selected.value)
165168
elif selected_from(selected, set_power_result_rx):
166169
new_status = self._handle_set_power_result(selected.value)
@@ -172,7 +175,10 @@ async def _run(self) -> None:
172175
)
173176

174177
# Send status update if status changed
175-
if self._blocking_status.is_blocked():
178+
if (
179+
self._blocking_status.is_blocked()
180+
and new_status != ComponentStatusEnum.NOT_WORKING
181+
):
176182
new_status = ComponentStatusEnum.UNCERTAIN
177183

178184
if new_status != self._last_status:

src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66

77
import asyncio
8+
import logging
89
from collections import abc
10+
from datetime import datetime
911

1012
from frequenz.channels import Receiver, Sender
1113
from frequenz.channels.util import Merge, select, selected_from
@@ -16,6 +18,8 @@
1618
from .. import Power
1719
from .._base_types import Bounds, SystemBounds
1820

21+
_logger = logging.getLogger(__name__)
22+
1923

2024
class EVCSystemBoundsTracker(BackgroundService):
2125
"""Track the system bounds for the EV chargers."""
@@ -41,12 +45,15 @@ def __init__(
4145
self._bounds_sender = bounds_sender
4246
self._latest_component_data: dict[int, microgrid.component.EVChargerData] = {}
4347
self._last_sent_bounds: SystemBounds | None = None
48+
self._component_pool_status = ComponentPoolStatus(set(), set())
4449

4550
def start(self) -> None:
4651
"""Start the EV charger system bounds tracker."""
4752
self._tasks.add(asyncio.create_task(self._run()))
4853

4954
async def _send_bounds(self) -> None:
55+
if not self._latest_component_data:
56+
return
5057
inclusion_bounds = Bounds(
5158
lower=Power.from_watts(
5259
sum(
@@ -102,25 +109,29 @@ async def _run(self) -> None:
102109
)
103110
)
104111

105-
component_pool_status = ComponentPoolStatus(set(), set())
106-
107-
async for selected in select(status_rx, ev_data_rx):
108-
if selected_from(selected, status_rx):
109-
status = selected.value
110-
component_pool_status = status
111-
for comp_id in self._latest_component_data:
112+
try:
113+
async for selected in select(status_rx, ev_data_rx):
114+
if selected_from(selected, status_rx):
115+
self._component_pool_status = selected.value
116+
to_pop = []
117+
for comp_id in self._latest_component_data:
118+
if (
119+
comp_id not in self._component_pool_status.working
120+
and comp_id not in self._component_pool_status.uncertain
121+
):
122+
to_pop.append(comp_id)
123+
for comp_id in to_pop:
124+
self._latest_component_data.pop(comp_id, None)
125+
elif selected_from(selected, ev_data_rx):
126+
data = selected.value
127+
comp_id = data.component_id
112128
if (
113-
comp_id not in component_pool_status.working
114-
and comp_id not in component_pool_status.uncertain
129+
comp_id not in self._component_pool_status.working
130+
and comp_id not in self._component_pool_status.uncertain
115131
):
116-
self._latest_component_data.pop(comp_id, None)
117-
elif selected_from(selected, ev_data_rx):
118-
if (
119-
comp_id not in component_pool_status.working
120-
and comp_id not in component_pool_status.uncertain
121-
):
122-
continue
123-
data = selected.value
124-
self._latest_component_data[data.component_id] = data
125-
126-
await self._send_bounds()
132+
continue
133+
self._latest_component_data[data.component_id] = data
134+
135+
await self._send_bounds()
136+
except:
137+
_logger.exception("bounds tracker failed")

0 commit comments

Comments
 (0)