44"""Interactions with pools of EV Chargers."""
55
66
7- import asyncio
8- import logging
97import uuid
10- from asyncio import Task
118from collections import abc
12- from dataclasses import dataclass
13- from datetime import timedelta
149
15- from frequenz .channels import Broadcast , ChannelClosedError , Receiver , Sender
16- from frequenz .client .microgrid import ComponentCategory , ComponentMetricId
10+ from frequenz .channels import Sender
11+ from frequenz .client .microgrid import ComponentCategory
1712
18- from ..._internal ._asyncio import cancel_and_await
1913from ...actor import ChannelRegistry , ComponentMetricRequest
2014from ...microgrid import connection_manager
21- from .. import Sample , Sample3Phase
22- from .._quantities import Current , Power , Quantity
15+ from .._quantities import Current , Power
2316from ..formula_engine import FormulaEngine , FormulaEngine3Phase
2417from ..formula_engine ._formula_engine_pool import FormulaEnginePool
2518from ..formula_engine ._formula_generators import (
2619 EVChargerCurrentFormula ,
2720 EVChargerPowerFormula ,
2821 FormulaGeneratorConfig ,
2922)
30- from ._set_current_bounds import BoundsSetter , ComponentCurrentLimit
31- from ._state_tracker import EVChargerState , StateTracker
32-
33- _logger = logging .getLogger (__name__ )
3423
3524
3625class EVChargerPoolError (Exception ):
3726 """An error that occurred in any of the EVChargerPool methods."""
3827
3928
40- @dataclass (frozen = True )
41- class EVChargerData :
42- """Data for an EV Charger, including the 3-phase current and the component state."""
43-
44- component_id : int
45- """The component ID of the EV Charger."""
46-
47- current : Sample3Phase [Current ]
48- """The 3-phase current of the EV Charger."""
49-
50- state : EVChargerState
51- """The state of the EV Charger."""
52-
53-
5429class EVChargerPool :
5530 """An interface for interaction with pools of EV Chargers.
5631
@@ -64,21 +39,13 @@ class EVChargerPool:
6439 and 3-phase
6540 [`current`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.current]
6641 measurements of the EV Chargers in the pool.
67- - The
68- [`component_data`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.component_data]
69- method for fetching the 3-phase current and state of individual EV Chargers in
70- the pool.
71- - The
72- [`set_bounds`][frequenz.sdk.timeseries.ev_charger_pool.EVChargerPool.set_bounds]
73- method for limiting the max current of individual EV Chargers in the pool.
7442 """
7543
7644 def __init__ (
7745 self ,
7846 channel_registry : ChannelRegistry ,
7947 resampler_subscription_sender : Sender [ComponentMetricRequest ],
8048 component_ids : abc .Set [int ] | None = None ,
81- repeat_interval : timedelta = timedelta (seconds = 3.0 ),
8249 ) -> None :
8350 """Create an `EVChargerPool` instance.
8451
@@ -95,11 +62,8 @@ def __init__(
9562 component_ids: An optional list of component_ids belonging to this pool. If
9663 not specified, IDs of all EV Chargers in the microgrid will be fetched
9764 from the component graph.
98- repeat_interval: Interval after which to repeat the last set bounds to the
99- microgrid API, if no new calls to `set_bounds` have been made.
10065 """
10166 self ._channel_registry : ChannelRegistry = channel_registry
102- self ._repeat_interval : timedelta = repeat_interval
10367 self ._resampler_subscription_sender : Sender [ComponentMetricRequest ] = (
10468 resampler_subscription_sender
10569 )
@@ -114,17 +78,12 @@ def __init__(
11478 component_categories = {ComponentCategory .EV_CHARGER }
11579 )
11680 }
117- self ._state_tracker : StateTracker | None = None
118- self ._status_streams : dict [int , tuple [Task [None ], Broadcast [EVChargerData ]]] = (
119- {}
120- )
12181 self ._namespace : str = f"ev-charger-pool-{ uuid .uuid4 ()} "
12282 self ._formula_pool : FormulaEnginePool = FormulaEnginePool (
12383 self ._namespace ,
12484 self ._channel_registry ,
12585 self ._resampler_subscription_sender ,
12686 )
127- self ._bounds_setter : BoundsSetter | None = None
12887
12988 @property
13089 def component_ids (self ) -> abc .Set [int ]:
@@ -185,171 +144,6 @@ def power(self) -> FormulaEngine[Power]:
185144 assert isinstance (engine , FormulaEngine )
186145 return engine
187146
188- def component_data (self , component_id : int ) -> Receiver [EVChargerData ]:
189- """Stream 3-phase current values and state of an EV Charger.
190-
191- Args:
192- component_id: id of the EV Charger for which data is requested.
193-
194- Returns:
195- A receiver that streams objects containing 3-phase current and state of
196- an EV Charger.
197- """
198- if recv := self ._status_streams .get (component_id , None ):
199- task , output_chan = recv
200- if not task .done ():
201- return output_chan .new_receiver ()
202- _logger .warning ("Restarting component_status for id: %s" , component_id )
203- else :
204- output_chan = Broadcast [EVChargerData ](
205- name = f"evpool-component_status-{ component_id } "
206- )
207-
208- task = asyncio .create_task (
209- self ._stream_component_data (component_id , output_chan .new_sender ())
210- )
211-
212- self ._status_streams [component_id ] = (task , output_chan )
213-
214- return output_chan .new_receiver ()
215-
216- async def set_bounds (self , component_id : int , max_current : Current ) -> None :
217- """Send given max current bound for the given EV Charger to the microgrid API.
218-
219- Bounds are used to limit the max current drawn by an EV, although the exact
220- value will be determined by the EV.
221-
222- Args:
223- component_id: ID of EV Charger to set the current bounds to.
224- max_current: maximum current that an EV can draw from this EV Charger.
225- """
226- if not self ._bounds_setter :
227- self ._bounds_setter = BoundsSetter (self ._repeat_interval )
228- await self ._bounds_setter .set (component_id , max_current .as_amperes ())
229-
230- def new_bounds_sender (self ) -> Sender [ComponentCurrentLimit ]:
231- """Return a `Sender` for setting EV Charger current bounds with.
232-
233- Bounds are used to limit the max current drawn by an EV, although the exact
234- value will be determined by the EV.
235-
236- Returns:
237- A new `Sender`.
238- """
239- if not self ._bounds_setter :
240- self ._bounds_setter = BoundsSetter (self ._repeat_interval )
241- return self ._bounds_setter .new_bounds_sender ()
242-
243147 async def stop (self ) -> None :
244148 """Stop all tasks and channels owned by the EVChargerPool."""
245- if self ._bounds_setter :
246- await self ._bounds_setter .stop ()
247- if self ._state_tracker :
248- await self ._state_tracker .stop ()
249149 await self ._formula_pool .stop ()
250- for stream in self ._status_streams .values ():
251- task , chan = stream
252- await chan .close ()
253- await cancel_and_await (task )
254-
255- async def _get_current_streams (self , component_id : int ) -> tuple [
256- Receiver [Sample [Quantity ]],
257- Receiver [Sample [Quantity ]],
258- Receiver [Sample [Quantity ]],
259- ]:
260- """Fetch current streams from the resampler for each phase.
261-
262- Args:
263- component_id: id of EV Charger for which current streams are being fetched.
264-
265- Returns:
266- A tuple of 3 receivers stream resampled current values for the given
267- component id, one for each phase.
268- """
269-
270- async def resampler_subscribe (
271- metric_id : ComponentMetricId ,
272- ) -> Receiver [Sample [Quantity ]]:
273- request = ComponentMetricRequest (
274- namespace = "ev-pool" ,
275- component_id = component_id ,
276- metric_id = metric_id ,
277- start_time = None ,
278- )
279- await self ._resampler_subscription_sender .send (request )
280- return self ._channel_registry .get_or_create (
281- Sample [Quantity ], request .get_channel_name ()
282- ).new_receiver ()
283-
284- return (
285- await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_1 ),
286- await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_2 ),
287- await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_3 ),
288- )
289-
290- async def _stream_component_data (
291- self ,
292- component_id : int ,
293- sender : Sender [EVChargerData ],
294- ) -> None :
295- """Stream 3-phase current values and state of an EV Charger.
296-
297- Args:
298- component_id: id of the EV Charger for which data is requested.
299- sender: A sender to stream EV Charger data to.
300-
301- Raises:
302- ChannelClosedError: If the channels from the resampler are closed.
303- """
304- if not self ._state_tracker :
305- self ._state_tracker = StateTracker (self ._component_ids )
306-
307- (phase_1_rx , phase_2_rx , phase_3_rx ) = await self ._get_current_streams (
308- component_id
309- )
310- while True :
311- try :
312- (phase_1 , phase_2 , phase_3 ) = (
313- await phase_1_rx .receive (),
314- await phase_2_rx .receive (),
315- await phase_3_rx .receive (),
316- )
317- except ChannelClosedError :
318- _logger .exception ("Streams closed for component_id=%s." , component_id )
319- raise
320-
321- sample = Sample3Phase (
322- timestamp = phase_1 .timestamp ,
323- value_p1 = (
324- None
325- if phase_1 .value is None
326- else Current .from_amperes (phase_1 .value .base_value )
327- ),
328- value_p2 = (
329- None
330- if phase_2 .value is None
331- else Current .from_amperes (phase_2 .value .base_value )
332- ),
333- value_p3 = (
334- None
335- if phase_3 .value is None
336- else Current .from_amperes (phase_3 .value .base_value )
337- ),
338- )
339-
340- if (
341- phase_1 .value is None
342- and phase_2 .value is None
343- and phase_3 .value is None
344- ):
345- state = EVChargerState .MISSING
346- else :
347- state = self ._state_tracker .get (component_id )
348-
349- await sender .send (
350- EVChargerData (
351- component_id = component_id ,
352- current = sample ,
353- state = state ,
354- )
355- )
0 commit comments