Skip to content

Commit 88416e5

Browse files
Add a EVChargerPool.set_bounds method for setting current bounds (#297)
This method would send the given current bound for the given EV Charger component ID, to the Microgrid API. It would also repeat the last bound to the microgrid, every configured interval, so that our requests don't expire. Bounds are used to limit the max current drawn by an EV, although the exact value will be determined by the EV.
2 parents 78a1edb + 737bed5 commit 88416e5

File tree

4 files changed

+177
-0
lines changed

4 files changed

+177
-0
lines changed

src/frequenz/sdk/_internal/asyncio.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@
1313
async def cancel_and_await(task: asyncio.Task[Any]) -> None:
1414
"""Cancel a task and wait for it to finish.
1515
16+
Exits immediately if the task is already done.
17+
1618
The `CancelledError` is suppresed, but any other exception will be propagated.
1719
1820
Args:
1921
task: The task to be cancelled and waited for.
2022
"""
23+
if task.done():
24+
return
2125
task.cancel()
2226
try:
2327
await task

src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
"""Interactions with EV Chargers."""
55

66
from ._ev_charger_pool import EVChargerData, EVChargerPool, EVChargerPoolError
7+
from ._set_current_bounds import ComponentCurrentLimit
78
from ._state_tracker import EVChargerState
89

910
__all__ = [
11+
"ComponentCurrentLimit",
1012
"EVChargerPool",
1113
"EVChargerData",
1214
"EVChargerPoolError",

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from asyncio import Task
1212
from collections import abc
1313
from dataclasses import dataclass
14+
from datetime import timedelta
1415

1516
from frequenz.channels import Broadcast, ChannelClosedError, Receiver, Sender
1617

@@ -24,6 +25,7 @@
2425
EVChargerPowerFormula,
2526
FormulaGeneratorConfig,
2627
)
28+
from ._set_current_bounds import BoundsSetter, ComponentCurrentLimit
2729
from ._state_tracker import EVChargerState, StateTracker
2830

2931
logger = logging.getLogger(__name__)
@@ -50,6 +52,7 @@ def __init__(
5052
channel_registry: ChannelRegistry,
5153
resampler_subscription_sender: Sender[ComponentMetricRequest],
5254
component_ids: set[int] | None = None,
55+
repeat_interval: timedelta = timedelta(seconds=3.0),
5356
) -> None:
5457
"""Create an `EVChargerPool` instance.
5558
@@ -61,8 +64,11 @@ def __init__(
6164
component_ids: An optional list of component_ids belonging to this pool. If
6265
not specified, IDs of all EV Chargers in the microgrid will be fetched
6366
from the component graph.
67+
repeat_interval: Interval after which to repeat the last set bounds to the
68+
microgrid API, if no new calls to `set_bounds` have been made.
6469
"""
6570
self._channel_registry: ChannelRegistry = channel_registry
71+
self._repeat_interval = repeat_interval
6672
self._resampler_subscription_sender: Sender[
6773
ComponentMetricRequest
6874
] = resampler_subscription_sender
@@ -87,6 +93,7 @@ def __init__(
8793
self._channel_registry,
8894
self._resampler_subscription_sender,
8995
)
96+
self._bounds_setter: BoundsSetter | None = None
9097

9198
@property
9299
def component_ids(self) -> abc.Set[int]:
@@ -158,6 +165,40 @@ async def component_data(self, component_id: int) -> Receiver[EVChargerData]:
158165

159166
return output_chan.new_receiver()
160167

168+
async def set_bounds(self, component_id: int, max_amps: float) -> None:
169+
"""Send given max current bound for the given EV Charger to the microgrid API.
170+
171+
Bounds are used to limit the max current drawn by an EV, although the exact
172+
value will be determined by the EV.
173+
174+
Args:
175+
component_id: ID of EV Charger to set the current bounds to.
176+
max_amps: maximum current in amps, that an EV can draw from this EV Charger.
177+
"""
178+
if not self._bounds_setter:
179+
self._bounds_setter = BoundsSetter(self._repeat_interval)
180+
await self._bounds_setter.set(component_id, max_amps)
181+
182+
def new_bounds_sender(self) -> Sender[ComponentCurrentLimit]:
183+
"""Return a `Sender` for setting EV Charger current bounds with.
184+
185+
Bounds are used to limit the max current drawn by an EV, although the exact
186+
value will be determined by the EV.
187+
188+
Returns:
189+
A new `Sender`.
190+
"""
191+
if not self._bounds_setter:
192+
self._bounds_setter = BoundsSetter(self._repeat_interval)
193+
return self._bounds_setter.new_bounds_sender()
194+
195+
async def stop(self) -> None:
196+
"""Stop all tasks and channels owned by the EVChargerPool."""
197+
if self._bounds_setter:
198+
await self._bounds_setter.stop()
199+
if self._state_tracker:
200+
await self._state_tracker.stop()
201+
161202
async def _get_current_streams(
162203
self, component_id: int
163204
) -> tuple[Receiver[Sample], Receiver[Sample], Receiver[Sample]]:
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A task for sending EV Charger power bounds to the microgrid API."""
5+
6+
import asyncio
7+
import logging
8+
from dataclasses import dataclass
9+
from datetime import timedelta
10+
from typing import Dict
11+
12+
from frequenz.channels import Broadcast, Sender
13+
from frequenz.channels.util import Select, Timer
14+
15+
from ..._internal.asyncio import cancel_and_await
16+
from ...microgrid import connection_manager
17+
from ...microgrid.component import ComponentCategory
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
@dataclass
23+
class ComponentCurrentLimit:
24+
"""A current limit, to be sent to the EV Charger."""
25+
26+
component_id: int
27+
max_amps: float
28+
29+
30+
class BoundsSetter:
31+
"""A task for sending EV Charger power bounds to the microgrid API.
32+
33+
Also, periodically resends the last set bounds to the microgrid API, if no new
34+
bounds have been set.
35+
"""
36+
37+
_NUM_PHASES = 3
38+
39+
def __init__(self, repeat_interval: timedelta) -> None:
40+
"""Create a `BoundsSetter` instance.
41+
42+
Args:
43+
repeat_interval: Interval after which to repeat the last set bounds to the
44+
microgrid API, if no new calls to `set_bounds` have been made.
45+
"""
46+
self._repeat_interval = repeat_interval
47+
48+
self._task: asyncio.Task[None] = asyncio.create_task(self._run())
49+
self._bounds_chan: Broadcast[ComponentCurrentLimit] = Broadcast("BoundsSetter")
50+
self._bounds_rx = self._bounds_chan.new_receiver()
51+
self._bounds_tx = self._bounds_chan.new_sender()
52+
53+
async def set(self, component_id: int, max_amps: float) -> None:
54+
"""Send the given current limit to the microgrid for the given component id.
55+
56+
Args:
57+
component_id: ID of EV Charger to set the current bounds to.
58+
max_amps: maximum current in amps, that an EV can draw from this EV Charger.
59+
"""
60+
await self._bounds_tx.send(ComponentCurrentLimit(component_id, max_amps))
61+
62+
def new_bounds_sender(self) -> Sender[ComponentCurrentLimit]:
63+
"""Return a `Sender` for setting EV Charger current bounds with.
64+
65+
Returns:
66+
A new `Sender`.
67+
"""
68+
return self._bounds_chan.new_sender()
69+
70+
async def stop(self) -> None:
71+
"""Stop the BoundsSetter."""
72+
await self._bounds_chan.close()
73+
await cancel_and_await(self._task)
74+
75+
async def _run(self) -> None:
76+
"""Wait for new bounds and forward them to the microgrid API.
77+
78+
Also, periodically resend the last set bounds to the microgrid API, if no new
79+
bounds have been set.
80+
81+
Raises:
82+
RuntimeError: If no meters are found in the component graph.
83+
ValueError: If the meter channel is closed.
84+
"""
85+
api_client = connection_manager.get().api_client
86+
graph = connection_manager.get().component_graph
87+
meters = graph.components(component_category={ComponentCategory.METER})
88+
if not meters:
89+
err = "No meters found in the component graph."
90+
logger.error(err)
91+
raise RuntimeError(err)
92+
93+
meter_data = (
94+
await api_client.meter_data(next(iter(meters)).component_id)
95+
).into_peekable()
96+
latest_bound: Dict[int, ComponentCurrentLimit] = {}
97+
98+
select = Select(
99+
bound_chan=self._bounds_rx,
100+
timer=Timer(self._repeat_interval.total_seconds()),
101+
)
102+
while await select.ready():
103+
meter = meter_data.peek()
104+
if meter is None:
105+
raise ValueError("Meter channel closed.")
106+
107+
if msg := select.bound_chan:
108+
bound: ComponentCurrentLimit = msg.inner
109+
if (
110+
bound.component_id in latest_bound
111+
and latest_bound[bound.component_id] == bound
112+
):
113+
continue
114+
latest_bound[bound.component_id] = bound
115+
min_voltage = min(meter.voltage_per_phase)
116+
logging.info("sending new bounds: %s", bound)
117+
await api_client.set_bounds(
118+
bound.component_id,
119+
0,
120+
bound.max_amps * min_voltage * self._NUM_PHASES,
121+
)
122+
elif msg := select.timer:
123+
for bound in latest_bound.values():
124+
min_voltage = min(meter.voltage_per_phase)
125+
logging.debug("resending bounds: %s", bound)
126+
await api_client.set_bounds(
127+
bound.component_id,
128+
0,
129+
bound.max_amps * min_voltage * self._NUM_PHASES,
130+
)

0 commit comments

Comments
 (0)