66from __future__ import annotations
77
88import asyncio
9- from collections .abc import Iterator
10- from dataclasses import dataclass
119from enum import Enum
1210from typing import Optional
1311
14- from frequenz .channels import Broadcast , Receiver
12+ from frequenz .channels import Receiver
1513from frequenz .channels .util import Merge
1614
1715from frequenz .sdk import microgrid
@@ -62,44 +60,6 @@ def from_ev_charger_data(cls, data: EVChargerData) -> EVChargerState:
6260 return EVChargerState .IDLE
6361
6462
65- @dataclass (frozen = True )
66- class EVChargerPoolStates :
67- """States of all EV Chargers in the pool."""
68-
69- _states : dict [int , EVChargerState ]
70- _changed_component : Optional [int ] = None
71-
72- def __iter__ (self ) -> Iterator [tuple [int , EVChargerState ]]:
73- """Iterate over states of all EV Chargers.
74-
75- Returns:
76- An iterator over all EV Charger states.
77- """
78- return iter (self ._states .items ())
79-
80- def latest_change (self ) -> Optional [tuple [int , EVChargerState ]]:
81- """Return the most recent EV Charger state change.
82-
83- The first `EVChargerPoolStates` instance created by a `StateTracker` will just
84- be a representation of the states of all EV Chargers. At that point, the most
85- recent change in state of an ev charger will be unknown, so this function will
86- return `None`.
87-
88- Returns:
89- None, when the most recent change is unknown. Otherwise, a tuple with
90- the component ID of an EV Charger that just had a state change, and its
91- new state.
92- """
93- if self ._changed_component is None :
94- return None
95- return (
96- self ._changed_component ,
97- self ._states .setdefault (
98- self ._changed_component , EVChargerState .UNSPECIFIED
99- ),
100- )
101-
102-
10363class StateTracker :
10464 """A class for keeping track of the states of all EV Chargers in a pool."""
10565
@@ -110,74 +70,50 @@ def __init__(self, component_ids: set[int]) -> None:
11070 component_ids: EV Charger component ids to track the states of.
11171 """
11272 self ._component_ids = component_ids
113- self ._channel = Broadcast [EVChargerPoolStates ](
114- "EVCharger States" , resend_latest = True
115- )
116- self ._task : Optional [asyncio .Task [None ]] = None
73+ self ._task : asyncio .Task [None ] = asyncio .create_task (self ._run ())
11774 self ._merged_stream : Optional [Merge [EVChargerData ]] = None
118- self ._states : dict [int , EVChargerState ] = {}
11975
120- def _get (self ) -> EVChargerPoolStates :
121- """Get a representation of the current states of all EV Chargers.
76+ # Initialize all components to the `MISSING` state. This will change as data
77+ # starts arriving from the individual components.
78+ self ._states : dict [int , EVChargerState ] = {
79+ component_id : EVChargerState .MISSING for component_id in component_ids
80+ }
81+
82+ def get (self , component_id : int ) -> EVChargerState :
83+ """Return the current state of the EV Charger with the given component ID.
84+
85+ Args:
86+ component_id: id of the EV Charger whose state is being fetched.
12287
12388 Returns:
124- An `EVChargerPoolStates` instance .
89+ An `EVChargerState` value corresponding to the given component id .
12590 """
126- return EVChargerPoolStates ( self ._states )
91+ return self ._states [ component_id ]
12792
12893 def _update (
12994 self ,
13095 data : EVChargerData ,
131- ) -> Optional [ EVChargerPoolStates ] :
96+ ) -> None :
13297 """Update the state of an EV Charger, from a new data point.
13398
13499 Args:
135100 data: component data from the microgrid, for an EV Charger in the pool.
136-
137- Returns:
138- A new `EVChargerPoolStates` instance representing all the EV Chargers in
139- the pool, in case there has been a state change for any of the EV
140- Chargers, or `None` otherwise.
141101 """
142102 evc_id = data .component_id
143103 new_state = EVChargerState .from_ev_charger_data (data )
144- if evc_id not in self ._states or self ._states [evc_id ] != new_state :
145- self ._states [evc_id ] = new_state
146- return EVChargerPoolStates (self ._states , evc_id )
147- return None
104+ self ._states [evc_id ] = new_state
148105
149106 async def _run (self ) -> None :
150107 api_client = microgrid .connection_manager .get ().api_client
151108 streams : list [Receiver [EVChargerData ]] = await asyncio .gather (
152109 * [api_client .ev_charger_data (cid ) for cid in self ._component_ids ]
153110 )
154-
155- # Start with the `MISSING` state for all components. This will change as data
156- # starts arriving from the individual components.
157- self ._states = {
158- component_id : EVChargerState .MISSING for component_id in self ._component_ids
159- }
160111 self ._merged_stream = Merge (* streams )
161- sender = self ._channel .new_sender ()
162- await sender .send (self ._get ())
163112 async for data in self ._merged_stream :
164- if updated_states := self ._update (data ):
165- await sender .send (updated_states )
166-
167- def new_receiver (self ) -> Receiver [EVChargerPoolStates ]:
168- """Return a receiver that streams ev charger states.
169-
170- Returns:
171- A receiver that streams the states of all EV Chargers in the pool, every
172- time the states of any of them change.
173- """
174- if self ._task is None or self ._task .done ():
175- self ._task = asyncio .create_task (self ._run ())
176- return self ._channel .new_receiver ()
113+ self ._update (data )
177114
178115 async def stop (self ) -> None :
179116 """Stop the status tracker."""
180- if self ._task :
181- await cancel_and_await (self ._task )
117+ await cancel_and_await (self ._task )
182118 if self ._merged_stream :
183119 await self ._merged_stream .stop ()
0 commit comments