55
66from __future__ import annotations
77
8+ import asyncio
89import logging
910import uuid
11+ from asyncio import Task
12+ from dataclasses import dataclass
1013
11- from frequenz .channels import Sender
14+ from frequenz .channels import Broadcast , ChannelClosedError , Receiver , Sender
1215
1316from ...actor import ChannelRegistry , ComponentMetricRequest
1417from ...microgrid import connection_manager
15- from ...microgrid .component import ComponentCategory
18+ from ...microgrid .component import ComponentCategory , ComponentMetricId
19+ from .. import Sample , Sample3Phase
1620from .._formula_engine import FormulaEnginePool , FormulaReceiver , FormulaReceiver3Phase
1721from .._formula_engine ._formula_generators import (
1822 EVChargerCurrentFormula ,
1923 EVChargerPowerFormula ,
2024 FormulaGeneratorConfig ,
2125)
26+ from ._state_tracker import EVChargerState , StateTracker
2227
2328logger = logging .getLogger (__name__ )
2429
@@ -27,6 +32,15 @@ class EVChargerPoolError(Exception):
2732 """An error that occurred in any of the EVChargerPool methods."""
2833
2934
35+ @dataclass (frozen = True )
36+ class EVChargerData :
37+ """Data for an EV Charger, including the 3-phase current and the component state."""
38+
39+ component_id : int
40+ current : Sample3Phase
41+ state : EVChargerState
42+
43+
3044class EVChargerPool :
3145 """Interactions with EV Chargers."""
3246
@@ -62,6 +76,10 @@ def __init__(
6276 component_category = {ComponentCategory .EV_CHARGER }
6377 )
6478 }
79+ self ._state_tracker : StateTracker | None = None
80+ self ._status_streams : dict [
81+ int , tuple [Task [None ], Broadcast [EVChargerData ]]
82+ ] = {}
6583 self ._namespace : str = f"ev-charger-pool-{ uuid .uuid4 ()} "
6684 self ._formula_pool : FormulaEnginePool = FormulaEnginePool (
6785 self ._namespace ,
@@ -148,3 +166,115 @@ async def power(self, component_id: int) -> FormulaReceiver:
148166 EVChargerPowerFormula ,
149167 FormulaGeneratorConfig (component_ids = {component_id }),
150168 )
169+
170+ async def component_data (self , component_id : int ) -> Receiver [EVChargerData ]:
171+ """Stream 3-phase current values and state of an EV Charger.
172+
173+ Args:
174+ component_id: id of the EV Charger for which data is requested.
175+
176+ Returns:
177+ A receiver that streams objects containing 3-phase current and state of
178+ an EV Charger.
179+ """
180+ if recv := self ._status_streams .get (component_id , None ):
181+ task , output_chan = recv
182+ if not task .done ():
183+ return output_chan .new_receiver ()
184+ logger .warning ("Restarting component_status for id: %s" , component_id )
185+ else :
186+ output_chan = Broadcast [EVChargerData ](
187+ f"evpool-component_status-{ component_id } "
188+ )
189+
190+ task = asyncio .create_task (
191+ self ._stream_component_data (component_id , output_chan .new_sender ())
192+ )
193+
194+ self ._status_streams [component_id ] = (task , output_chan )
195+
196+ return output_chan .new_receiver ()
197+
198+ async def _get_current_streams (
199+ self , component_id : int
200+ ) -> tuple [Receiver [Sample ], Receiver [Sample ], Receiver [Sample ]]:
201+ """Fetch current streams from the resampler for each phase.
202+
203+ Args:
204+ component_id: id of EV Charger for which current streams are being fetched.
205+
206+ Returns:
207+ A tuple of 3 receivers stream resampled current values for the given
208+ component id, one for each phase.
209+ """
210+
211+ async def resampler_subscribe (metric_id : ComponentMetricId ) -> Receiver [Sample ]:
212+ request = ComponentMetricRequest (
213+ namespace = "ev-pool" ,
214+ component_id = component_id ,
215+ metric_id = metric_id ,
216+ start_time = None ,
217+ )
218+ await self ._resampler_subscription_sender .send (request )
219+ return self ._channel_registry .new_receiver (request .get_channel_name ())
220+
221+ return (
222+ await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_1 ),
223+ await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_2 ),
224+ await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_3 ),
225+ )
226+
227+ async def _stream_component_data (
228+ self ,
229+ component_id : int ,
230+ sender : Sender [EVChargerData ],
231+ ) -> None :
232+ """Stream 3-phase current values and state of an EV Charger.
233+
234+ Args:
235+ component_id: id of the EV Charger for which data is requested.
236+ sender: A sender to stream EV Charger data to.
237+
238+ Raises:
239+ ChannelClosedError: If the channels from the resampler are closed.
240+ """
241+ if not self ._state_tracker :
242+ self ._state_tracker = StateTracker (self ._component_ids )
243+
244+ (phase_1_rx , phase_2_rx , phase_3_rx ) = await self ._get_current_streams (
245+ component_id
246+ )
247+ while True :
248+ try :
249+ (phase_1 , phase_2 , phase_3 ) = (
250+ await phase_1_rx .receive (),
251+ await phase_2_rx .receive (),
252+ await phase_3_rx .receive (),
253+ )
254+ except ChannelClosedError :
255+ logger .exception ("Streams closed for component_id=%s." , component_id )
256+ raise
257+
258+ sample = Sample3Phase (
259+ timestamp = phase_1 .timestamp ,
260+ value_p1 = phase_1 .value ,
261+ value_p2 = phase_2 .value ,
262+ value_p3 = phase_3 .value ,
263+ )
264+
265+ if (
266+ phase_1 .value is None
267+ and phase_2 .value is None
268+ and phase_3 .value is None
269+ ):
270+ state = EVChargerState .MISSING
271+ else :
272+ state = self ._state_tracker .get (component_id )
273+
274+ await sender .send (
275+ EVChargerData (
276+ component_id = component_id ,
277+ current = sample ,
278+ state = state ,
279+ )
280+ )
0 commit comments