55
66from __future__ import annotations
77
8+ import asyncio
89import logging
910import uuid
11+ from asyncio import Task
12+ from collections import abc
13+ from dataclasses import dataclass
1014
11- from frequenz .channels import Sender
15+ from frequenz .channels import Broadcast , ChannelClosedError , Receiver , Sender
1216
1317from ...actor import ChannelRegistry , ComponentMetricRequest
1418from ...microgrid import connection_manager
15- from ...microgrid .component import ComponentCategory
19+ from ...microgrid .component import ComponentCategory , ComponentMetricId
20+ from .. import Sample , Sample3Phase
1621from .._formula_engine import FormulaEnginePool , FormulaReceiver , FormulaReceiver3Phase
1722from .._formula_engine ._formula_generators import (
1823 EVChargerCurrentFormula ,
1924 EVChargerPowerFormula ,
2025 FormulaGeneratorConfig ,
2126)
27+ from ._state_tracker import EVChargerState , StateTracker
2228
2329logger = logging .getLogger (__name__ )
2430
@@ -27,6 +33,15 @@ class EVChargerPoolError(Exception):
2733 """An error that occurred in any of the EVChargerPool methods."""
2834
2935
36+ @dataclass (frozen = True )
37+ class EVChargerData :
38+ """Data for an EV Charger, including the 3-phase current and the component state."""
39+
40+ component_id : int
41+ current : Sample3Phase
42+ state : EVChargerState
43+
44+
3045class EVChargerPool :
3146 """Interactions with EV Chargers."""
3247
@@ -62,14 +77,27 @@ def __init__(
6277 component_category = {ComponentCategory .EV_CHARGER }
6378 )
6479 }
80+ self ._state_tracker : StateTracker | None = None
81+ self ._status_streams : dict [
82+ int , tuple [Task [None ], Broadcast [EVChargerData ]]
83+ ] = {}
6584 self ._namespace : str = f"ev-charger-pool-{ uuid .uuid4 ()} "
6685 self ._formula_pool : FormulaEnginePool = FormulaEnginePool (
6786 self ._namespace ,
6887 self ._channel_registry ,
6988 self ._resampler_subscription_sender ,
7089 )
7190
72- async def total_current (self ) -> FormulaReceiver3Phase :
91+ @property
92+ def component_ids (self ) -> abc .Set [int ]:
93+ """Return component IDs of all EV Chargers managed by this EVChargerPool.
94+
95+ Returns:
96+ Set of managed component IDs.
97+ """
98+ return self ._component_ids
99+
100+ async def current (self ) -> FormulaReceiver3Phase :
73101 """Fetch the total current for the EV Chargers in the pool.
74102
75103 If a formula engine to calculate EV Charger current is not already running, it
@@ -85,7 +113,7 @@ async def total_current(self) -> FormulaReceiver3Phase:
85113 FormulaGeneratorConfig (component_ids = self ._component_ids ),
86114 )
87115
88- async def total_power (self ) -> FormulaReceiver :
116+ async def power (self ) -> FormulaReceiver :
89117 """Fetch the total power for the EV Chargers in the pool.
90118
91119 If a formula engine to calculate EV Charger power is not already running, it
@@ -102,49 +130,114 @@ async def total_power(self) -> FormulaReceiver:
102130 FormulaGeneratorConfig (component_ids = self ._component_ids ),
103131 )
104132
105- async def current (self , component_id : int ) -> FormulaReceiver3Phase :
106- """Fetch the 3-phase current for the given EV Charger id .
133+ async def component_data (self , component_id : int ) -> Receiver [ EVChargerData ] :
134+ """Stream 3-phase current values and state of an EV Charger.
107135
108136 Args:
109- component_id: id of the EV Charger to stream current values for .
137+ component_id: id of the EV Charger for which data is requested .
110138
111139 Returns:
112- A *new* receiver that will stream 3-phase current values for the given
113- EV Charger.
114-
115- Raises:
116- EVChargerPoolError: if the given component_id is not part of the pool.
140+ A receiver that streams objects containing 3-phase current and state of
141+ an EV Charger.
117142 """
118- if component_id not in self ._component_ids :
119- raise EVChargerPoolError (
120- f"{ component_id = } is not part of the EVChargerPool"
121- f" (with ids={ self ._component_ids } )"
143+ if recv := self ._status_streams .get (component_id , None ):
144+ task , output_chan = recv
145+ if not task .done ():
146+ return output_chan .new_receiver ()
147+ logger .warning ("Restarting component_status for id: %s" , component_id )
148+ else :
149+ output_chan = Broadcast [EVChargerData ](
150+ f"evpool-component_status-{ component_id } "
122151 )
123- return await self ._formula_pool .from_generator (
124- f"ev_charger_current_{ component_id } " ,
125- EVChargerCurrentFormula ,
126- FormulaGeneratorConfig (component_ids = {component_id }),
152+
153+ task = asyncio .create_task (
154+ self ._stream_component_data (component_id , output_chan .new_sender ())
127155 )
128156
129- async def power (self , component_id : int ) -> FormulaReceiver :
130- """Fetch the power for the given EV Charger id.
157+ self ._status_streams [component_id ] = (task , output_chan )
158+
159+ return output_chan .new_receiver ()
160+
161+ async def _get_current_streams (
162+ self , component_id : int
163+ ) -> tuple [Receiver [Sample ], Receiver [Sample ], Receiver [Sample ]]:
164+ """Fetch current streams from the resampler for each phase.
131165
132166 Args:
133- component_id: id of the EV Charger to stream power values for .
167+ component_id: id of EV Charger for which current streams are being fetched .
134168
135169 Returns:
136- A *new* receiver that will stream power values for the given EV Charger.
170+ A tuple of 3 receivers stream resampled current values for the given
171+ component id, one for each phase.
172+ """
173+
174+ async def resampler_subscribe (metric_id : ComponentMetricId ) -> Receiver [Sample ]:
175+ request = ComponentMetricRequest (
176+ namespace = "ev-pool" ,
177+ component_id = component_id ,
178+ metric_id = metric_id ,
179+ start_time = None ,
180+ )
181+ await self ._resampler_subscription_sender .send (request )
182+ return self ._channel_registry .new_receiver (request .get_channel_name ())
183+
184+ return (
185+ await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_1 ),
186+ await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_2 ),
187+ await resampler_subscribe (ComponentMetricId .CURRENT_PHASE_3 ),
188+ )
189+
190+ async def _stream_component_data (
191+ self ,
192+ component_id : int ,
193+ sender : Sender [EVChargerData ],
194+ ) -> None :
195+ """Stream 3-phase current values and state of an EV Charger.
196+
197+ Args:
198+ component_id: id of the EV Charger for which data is requested.
199+ sender: A sender to stream EV Charger data to.
137200
138201 Raises:
139- EVChargerPoolError: if the given component_id is not part of the pool .
202+ ChannelClosedError: If the channels from the resampler are closed .
140203 """
141- if component_id not in self ._component_ids :
142- raise EVChargerPoolError (
143- f"{ component_id = } is not part of the EVChargerPool"
144- f" (with ids={ self ._component_ids } )"
145- )
146- return await self ._formula_pool .from_generator (
147- f"ev_charger_current_{ component_id } " ,
148- EVChargerPowerFormula ,
149- FormulaGeneratorConfig (component_ids = {component_id }),
204+ if not self ._state_tracker :
205+ self ._state_tracker = StateTracker (self ._component_ids )
206+
207+ (phase_1_rx , phase_2_rx , phase_3_rx ) = await self ._get_current_streams (
208+ component_id
150209 )
210+ while True :
211+ try :
212+ (phase_1 , phase_2 , phase_3 ) = (
213+ await phase_1_rx .receive (),
214+ await phase_2_rx .receive (),
215+ await phase_3_rx .receive (),
216+ )
217+ except ChannelClosedError :
218+ logger .exception ("Streams closed for component_id=%s." , component_id )
219+ raise
220+
221+ sample = Sample3Phase (
222+ timestamp = phase_1 .timestamp ,
223+ value_p1 = phase_1 .value ,
224+ value_p2 = phase_2 .value ,
225+ value_p3 = phase_3 .value ,
226+ )
227+
228+ if (
229+ phase_1 .value is None
230+ and phase_2 .value is None
231+ and phase_3 .value is None
232+ ):
233+ state = EVChargerState .MISSING
234+ else :
235+ state = self ._state_tracker .get (component_id )
236+
237+ await sender .send (
238+ EVChargerData (
239+ component_id = component_id ,
240+ current = sample ,
241+ state = state ,
242+ )
243+ )
0 commit comments