Skip to content

Commit c9fa07e

Browse files
committed
Add a EVChargerPool.set_bounds method for setting current bounds
This method would send the given current bound for the given EV Charger component ID, to the Microgrid API. It would also repeat the last bound to the microgrid, every configured interval. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent b5c8f3f commit c9fa07e

File tree

2 files changed

+143
-0
lines changed

2 files changed

+143
-0
lines changed

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from asyncio import Task
1212
from collections import abc
1313
from dataclasses import dataclass
14+
from datetime import timedelta
1415

1516
from frequenz.channels import Broadcast, ChannelClosedError, Receiver, Sender
1617

@@ -24,6 +25,7 @@
2425
EVChargerPowerFormula,
2526
FormulaGeneratorConfig,
2627
)
28+
from ._set_current_bounds import BoundsSetter
2729
from ._state_tracker import EVChargerState, StateTracker
2830

2931
logger = logging.getLogger(__name__)
@@ -50,6 +52,7 @@ def __init__(
5052
channel_registry: ChannelRegistry,
5153
resampler_subscription_sender: Sender[ComponentMetricRequest],
5254
component_ids: set[int] | None = None,
55+
repeat_interval: timedelta = timedelta(seconds=3.0),
5356
) -> None:
5457
"""Create an `EVChargerPool` instance.
5558
@@ -61,8 +64,11 @@ def __init__(
6164
component_ids: An optional list of component_ids belonging to this pool. If
6265
not specified, IDs of all EV Chargers in the microgrid will be fetched
6366
from the component graph.
67+
repeat_interval: Interval after which to repeat the last set bounds to the
68+
microgrid API, if no new calls to `set_bounds` have been made.
6469
"""
6570
self._channel_registry: ChannelRegistry = channel_registry
71+
self._repeat_interval = repeat_interval
6672
self._resampler_subscription_sender: Sender[
6773
ComponentMetricRequest
6874
] = resampler_subscription_sender
@@ -87,6 +93,7 @@ def __init__(
8793
self._channel_registry,
8894
self._resampler_subscription_sender,
8995
)
96+
self._bounds_setter: BoundsSetter | None = None
9097

9198
@property
9299
def component_ids(self) -> abc.Set[int]:
@@ -158,6 +165,20 @@ async def component_data(self, component_id: int) -> Receiver[EVChargerData]:
158165

159166
return output_chan.new_receiver()
160167

168+
async def set_bounds(self, component_id: int, max_amps: float) -> None:
169+
"""Send given max current bound for the given EV Charger to the microgrid API.
170+
171+
Bounds are used to limit the max current drawn by an EV, although the exact
172+
value will be determined by the EV.
173+
174+
Args:
175+
component_id: ID of EV Charger to set the current bounds to.
176+
max_amps: Current bound value to set for the EV Charger.
177+
"""
178+
if not self._bounds_setter:
179+
self._bounds_setter = BoundsSetter(self._repeat_interval)
180+
await self._bounds_setter.set(component_id, max_amps)
181+
161182
async def _get_current_streams(
162183
self, component_id: int
163184
) -> tuple[Receiver[Sample], Receiver[Sample], Receiver[Sample]]:
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A task for sending EV Charger power bounds to the microgrid API."""
5+
6+
import asyncio
7+
import logging
8+
from dataclasses import dataclass
9+
from datetime import timedelta
10+
from typing import Dict
11+
12+
from frequenz.channels import Broadcast
13+
from frequenz.channels.util import Select, Timer
14+
15+
from ..._internal.asyncio import cancel_and_await
16+
from ...microgrid import connection_manager
17+
from ...microgrid.component import ComponentCategory
18+
19+
logger = logging.getLogger(__name__)
20+
21+
22+
@dataclass
23+
class ComponentCurrentLimit:
24+
"""A current limit, to be sent to the EV Charger."""
25+
26+
component_id: int
27+
max_amps: float
28+
29+
30+
class BoundsSetter:
31+
"""A task for sending EV Charger power bounds to the microgrid API.
32+
33+
Also, periodically resends the last set bounds to the microgrid API, if no new
34+
bounds have been set.
35+
"""
36+
37+
_NUM_PHASES = 3
38+
39+
def __init__(self, repeat_interval: timedelta) -> None:
40+
"""Create a `BoundsSetter` instance.
41+
42+
Args:
43+
repeat_interval: Interval after which to repeat the last set bounds to the
44+
microgrid API, if no new calls to `set_bounds` have been made.
45+
"""
46+
self._repeat_interval = repeat_interval
47+
48+
self._task: asyncio.Task[None] = asyncio.create_task(self._run())
49+
self._bounds_chan: Broadcast[ComponentCurrentLimit] = Broadcast("BoundsSetter")
50+
self._bounds_rx = self._bounds_chan.new_receiver()
51+
self._bounds_tx = self._bounds_chan.new_sender()
52+
53+
async def set(self, component_id: int, max_amps: float) -> None:
54+
"""Send the given current limit to the microgrid for the given component id.
55+
56+
Args:
57+
component_id: ID of EV Charger to set the current bounds to.
58+
max_amps: Current bound value to set for the EV Charger.
59+
"""
60+
await self._bounds_tx.send(ComponentCurrentLimit(component_id, max_amps))
61+
62+
async def stop(self) -> None:
63+
"""Stop the BoundsSetter."""
64+
await self._bounds_chan.close()
65+
await cancel_and_await(self._task)
66+
67+
async def _run(self) -> None:
68+
"""Wait for new bounds and forward them to the microgrid API.
69+
70+
Also, periodically resend the last set bounds to the microgrid API, if no new
71+
bounds have been set.
72+
73+
Raises:
74+
RuntimeError: If no meters are found in the component graph.
75+
ValueError: If the meter channel is closed.
76+
"""
77+
api_client = connection_manager.get().api_client
78+
graph = connection_manager.get().component_graph
79+
meters = graph.components(component_category={ComponentCategory.METER})
80+
if not meters:
81+
err = "No meters found in the component graph."
82+
logger.error(err)
83+
raise RuntimeError(err)
84+
85+
meter_data = (
86+
await api_client.meter_data(next(iter(meters)).component_id)
87+
).into_peekable()
88+
latest_bound: Dict[int, ComponentCurrentLimit] = {}
89+
90+
select = Select(
91+
bound_chan=self._bounds_rx,
92+
timer=Timer(self._repeat_interval.total_seconds()),
93+
)
94+
while await select.ready():
95+
meter = meter_data.peek()
96+
if meter is None:
97+
raise ValueError("Meter channel closed.")
98+
99+
if msg := select.bound_chan:
100+
bound: ComponentCurrentLimit = msg.inner
101+
if (
102+
bound.component_id in latest_bound
103+
and latest_bound[bound.component_id] == bound
104+
):
105+
continue
106+
latest_bound[bound.component_id] = bound
107+
min_voltage = min(meter.voltage_per_phase)
108+
logging.info("sending new bounds: %s", bound)
109+
await api_client.set_bounds(
110+
bound.component_id,
111+
0,
112+
bound.max_amps * min_voltage * self._NUM_PHASES,
113+
)
114+
elif msg := select.timer:
115+
for bound in latest_bound.values():
116+
min_voltage = min(meter.voltage_per_phase)
117+
logging.debug("resending bounds: %s", bound)
118+
await api_client.set_bounds(
119+
bound.component_id,
120+
0,
121+
bound.max_amps * min_voltage * self._NUM_PHASES,
122+
)

0 commit comments

Comments
 (0)