Skip to content

Commit 9df881a

Browse files
committed
Add a system bounds tracker for EV charger pools
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 3941ff3 commit 9df881a

File tree

1 file changed

+152
-0
lines changed

1 file changed

+152
-0
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""System bounds tracker for the EV chargers."""
5+
6+
7+
import asyncio
8+
import logging
9+
from collections import abc
10+
11+
from frequenz.channels import Receiver, Sender, merge, select, selected_from
12+
from frequenz.client.microgrid import EVChargerData
13+
14+
from ...actor import BackgroundService
15+
from ...actor.power_distributing._component_status import ComponentPoolStatus
16+
from ...microgrid import connection_manager
17+
from .. import Power
18+
from .._base_types import Bounds, SystemBounds
19+
20+
_logger = logging.getLogger(__name__)
21+
22+
23+
class EVCSystemBoundsTracker(BackgroundService):
24+
"""Track the system bounds for the EV chargers.
25+
26+
System bounds are the aggregate bounds for the EV chargers in the pool that are
27+
working and have an EV attached to them. They are calculated from the individual
28+
bounds received from the microgrid API.
29+
30+
The system bounds are sent to the `bounds_sender` whenever they change.
31+
"""
32+
33+
def __init__(
34+
self,
35+
component_ids: abc.Set[int],
36+
status_receiver: Receiver[ComponentPoolStatus],
37+
bounds_sender: Sender[SystemBounds],
38+
):
39+
"""Initialize the system bounds tracker.
40+
41+
Args:
42+
component_ids: The ids of the components to track.
43+
status_receiver: A receiver that streams the status of the EV Chargers in
44+
the pool.
45+
bounds_sender: A sender to send the system bounds to.
46+
"""
47+
super().__init__()
48+
49+
self._component_ids = component_ids
50+
self._status_receiver = status_receiver
51+
self._bounds_sender = bounds_sender
52+
self._latest_component_data: dict[int, EVChargerData] = {}
53+
self._last_sent_bounds: SystemBounds | None = None
54+
self._component_pool_status = ComponentPoolStatus(set(), set())
55+
56+
def start(self) -> None:
57+
"""Start the EV charger system bounds tracker."""
58+
self._tasks.add(asyncio.create_task(self._run_forever()))
59+
60+
async def _send_bounds(self) -> None:
61+
"""Calculate and send the aggregate system bounds if they have changed."""
62+
if not self._latest_component_data:
63+
return
64+
inclusion_bounds = Bounds(
65+
lower=Power.from_watts(
66+
sum(
67+
data.active_power_inclusion_lower_bound
68+
for data in self._latest_component_data.values()
69+
)
70+
),
71+
upper=Power.from_watts(
72+
sum(
73+
data.active_power_inclusion_upper_bound
74+
for data in self._latest_component_data.values()
75+
)
76+
),
77+
)
78+
exclusion_bounds = Bounds(
79+
lower=Power.from_watts(
80+
sum(
81+
data.active_power_exclusion_lower_bound
82+
for data in self._latest_component_data.values()
83+
)
84+
),
85+
upper=Power.from_watts(
86+
sum(
87+
data.active_power_exclusion_upper_bound
88+
for data in self._latest_component_data.values()
89+
)
90+
),
91+
)
92+
93+
if (
94+
self._last_sent_bounds is None
95+
or inclusion_bounds != self._last_sent_bounds.inclusion_bounds
96+
or exclusion_bounds != self._last_sent_bounds.exclusion_bounds
97+
):
98+
self._last_sent_bounds = SystemBounds(
99+
timestamp=max(
100+
msg.timestamp for msg in self._latest_component_data.values()
101+
),
102+
inclusion_bounds=inclusion_bounds,
103+
exclusion_bounds=exclusion_bounds,
104+
)
105+
await self._bounds_sender.send(self._last_sent_bounds)
106+
107+
async def _run_forever(self) -> None:
108+
"""Run the status tracker forever."""
109+
while True:
110+
try:
111+
await self._run()
112+
except Exception: # pylint: disable=broad-except
113+
_logger.exception(
114+
"Restarting after exception in EVChargerSystemBoundsTracker.run()"
115+
)
116+
await asyncio.sleep(1.0)
117+
118+
async def _run(self) -> None:
119+
"""Run the system bounds tracker."""
120+
api_client = connection_manager.get().api_client
121+
status_rx = self._status_receiver
122+
ev_data_rx = merge(
123+
*(
124+
await asyncio.gather(
125+
*[api_client.ev_charger_data(cid) for cid in self._component_ids]
126+
)
127+
)
128+
)
129+
130+
async for selected in select(status_rx, ev_data_rx):
131+
if selected_from(selected, status_rx):
132+
self._component_pool_status = selected.message
133+
to_remove = []
134+
for comp_id in self._latest_component_data:
135+
if (
136+
comp_id not in self._component_pool_status.working
137+
and comp_id not in self._component_pool_status.uncertain
138+
):
139+
to_remove.append(comp_id)
140+
for comp_id in to_remove:
141+
del self._latest_component_data[comp_id]
142+
elif selected_from(selected, ev_data_rx):
143+
data = selected.message
144+
comp_id = data.component_id
145+
if (
146+
comp_id not in self._component_pool_status.working
147+
and comp_id not in self._component_pool_status.uncertain
148+
):
149+
continue
150+
self._latest_component_data[data.component_id] = data
151+
152+
await self._send_bounds()

0 commit comments

Comments
 (0)