Skip to content

Commit bb54b63

Browse files
committed
Remove support for streaming per-component data from EVChargerPool
Also remove per-component status tracking support. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 9d4a8e1 commit bb54b63

File tree

4 files changed

+5
-483
lines changed

4 files changed

+5
-483
lines changed

src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33

44
"""Interactions with EV Chargers."""
55

6-
from ._ev_charger_pool import EVChargerData, EVChargerPool, EVChargerPoolError
6+
from ._ev_charger_pool import EVChargerPool, EVChargerPoolError
77
from ._set_current_bounds import ComponentCurrentLimit
8-
from ._state_tracker import EVChargerState
98

109
__all__ = [
1110
"ComponentCurrentLimit",
1211
"EVChargerPool",
13-
"EVChargerData",
1412
"EVChargerPoolError",
15-
"EVChargerState",
1613
]

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py

Lines changed: 3 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,19 @@
44
"""Interactions with pools of EV Chargers."""
55

66

7-
import asyncio
8-
import logging
97
import uuid
10-
from asyncio import Task
118
from collections import abc
12-
from dataclasses import dataclass
139
from datetime import timedelta
1410

15-
from frequenz.channels import Broadcast, ChannelClosedError, Receiver, Sender
11+
from frequenz.channels import Broadcast, Receiver, Sender
1612

17-
from ..._internal._asyncio import cancel_and_await
1813
from ..._internal._channels import ReceiverFetcher
1914
from ...actor import ChannelRegistry, ComponentMetricRequest
2015
from ...actor.power_distributing import ComponentPoolStatus
2116
from ...microgrid import connection_manager
22-
from ...microgrid.component import ComponentCategory, ComponentMetricId
23-
from .. import Sample, Sample3Phase
17+
from ...microgrid.component import ComponentCategory
2418
from .._base_types import SystemBounds
25-
from .._quantities import Current, Power, Quantity
19+
from .._quantities import Current, Power
2620
from ..formula_engine import FormulaEngine, FormulaEngine3Phase
2721
from ..formula_engine._formula_engine_pool import FormulaEnginePool
2822
from ..formula_engine._formula_generators import (
@@ -31,30 +25,13 @@
3125
FormulaGeneratorConfig,
3226
)
3327
from ._set_current_bounds import BoundsSetter, ComponentCurrentLimit
34-
from ._state_tracker import EVChargerState, StateTracker
3528
from ._system_bounds_tracker import EVCSystemBoundsTracker
3629

37-
_logger = logging.getLogger(__name__)
38-
3930

4031
class EVChargerPoolError(Exception):
4132
"""An error that occurred in any of the EVChargerPool methods."""
4233

4334

44-
@dataclass(frozen=True)
45-
class EVChargerData:
46-
"""Data for an EV Charger, including the 3-phase current and the component state."""
47-
48-
component_id: int
49-
"""The component ID of the EV Charger."""
50-
51-
current: Sample3Phase[Current]
52-
"""The 3-phase current of the EV Charger."""
53-
54-
state: EVChargerState
55-
"""The state of the EV Charger."""
56-
57-
5835
class EVChargerPool:
5936
"""An interface for interaction with pools of EV Chargers.
6037
@@ -122,10 +99,6 @@ def __init__( # pylint: disable=too-many-arguments
12299
component_categories={ComponentCategory.EV_CHARGER}
123100
)
124101
}
125-
self._state_tracker: StateTracker | None = None
126-
self._status_streams: dict[int, tuple[Task[None], Broadcast[EVChargerData]]] = (
127-
{}
128-
)
129102
self._namespace: str = f"ev-charger-pool-{uuid.uuid4()}"
130103
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
131104
self._namespace,
@@ -201,34 +174,6 @@ def power(self) -> FormulaEngine[Power]:
201174
assert isinstance(engine, FormulaEngine)
202175
return engine
203176

204-
def component_data(self, component_id: int) -> Receiver[EVChargerData]:
205-
"""Stream 3-phase current values and state of an EV Charger.
206-
207-
Args:
208-
component_id: id of the EV Charger for which data is requested.
209-
210-
Returns:
211-
A receiver that streams objects containing 3-phase current and state of
212-
an EV Charger.
213-
"""
214-
if recv := self._status_streams.get(component_id, None):
215-
task, output_chan = recv
216-
if not task.done():
217-
return output_chan.new_receiver()
218-
_logger.warning("Restarting component_status for id: %s", component_id)
219-
else:
220-
output_chan = Broadcast[EVChargerData](
221-
f"evpool-component_status-{component_id}"
222-
)
223-
224-
task = asyncio.create_task(
225-
self._stream_component_data(component_id, output_chan.new_sender())
226-
)
227-
228-
self._status_streams[component_id] = (task, output_chan)
229-
230-
return output_chan.new_receiver()
231-
232177
async def set_bounds(self, component_id: int, max_current: Current) -> None:
233178
"""Send given max current bound for the given EV Charger to the microgrid API.
234179
@@ -260,115 +205,7 @@ async def stop(self) -> None:
260205
"""Stop all tasks and channels owned by the EVChargerPool."""
261206
if self._bounds_setter:
262207
await self._bounds_setter.stop()
263-
if self._state_tracker:
264-
await self._state_tracker.stop()
265208
await self._formula_pool.stop()
266-
for stream in self._status_streams.values():
267-
task, chan = stream
268-
await chan.close()
269-
await cancel_and_await(task)
270-
271-
async def _get_current_streams(self, component_id: int) -> tuple[
272-
Receiver[Sample[Quantity]],
273-
Receiver[Sample[Quantity]],
274-
Receiver[Sample[Quantity]],
275-
]:
276-
"""Fetch current streams from the resampler for each phase.
277-
278-
Args:
279-
component_id: id of EV Charger for which current streams are being fetched.
280-
281-
Returns:
282-
A tuple of 3 receivers stream resampled current values for the given
283-
component id, one for each phase.
284-
"""
285-
286-
async def resampler_subscribe(
287-
metric_id: ComponentMetricId,
288-
) -> Receiver[Sample[Quantity]]:
289-
request = ComponentMetricRequest(
290-
namespace="ev-pool",
291-
component_id=component_id,
292-
metric_id=metric_id,
293-
start_time=None,
294-
)
295-
await self._resampler_subscription_sender.send(request)
296-
return self._channel_registry.get_or_create(
297-
Sample[Quantity], request.get_channel_name()
298-
).new_receiver()
299-
300-
return (
301-
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_1),
302-
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_2),
303-
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_3),
304-
)
305-
306-
async def _stream_component_data(
307-
self,
308-
component_id: int,
309-
sender: Sender[EVChargerData],
310-
) -> None:
311-
"""Stream 3-phase current values and state of an EV Charger.
312-
313-
Args:
314-
component_id: id of the EV Charger for which data is requested.
315-
sender: A sender to stream EV Charger data to.
316-
317-
Raises:
318-
ChannelClosedError: If the channels from the resampler are closed.
319-
"""
320-
if not self._state_tracker:
321-
self._state_tracker = StateTracker(self._component_ids)
322-
323-
(phase_1_rx, phase_2_rx, phase_3_rx) = await self._get_current_streams(
324-
component_id
325-
)
326-
while True:
327-
try:
328-
(phase_1, phase_2, phase_3) = (
329-
await phase_1_rx.receive(),
330-
await phase_2_rx.receive(),
331-
await phase_3_rx.receive(),
332-
)
333-
except ChannelClosedError:
334-
_logger.exception("Streams closed for component_id=%s.", component_id)
335-
raise
336-
337-
sample = Sample3Phase(
338-
timestamp=phase_1.timestamp,
339-
value_p1=(
340-
None
341-
if phase_1.value is None
342-
else Current.from_amperes(phase_1.value.base_value)
343-
),
344-
value_p2=(
345-
None
346-
if phase_2.value is None
347-
else Current.from_amperes(phase_2.value.base_value)
348-
),
349-
value_p3=(
350-
None
351-
if phase_3.value is None
352-
else Current.from_amperes(phase_3.value.base_value)
353-
),
354-
)
355-
356-
if (
357-
phase_1.value is None
358-
and phase_2.value is None
359-
and phase_3.value is None
360-
):
361-
state = EVChargerState.MISSING
362-
else:
363-
state = self._state_tracker.get(component_id)
364-
365-
await sender.send(
366-
EVChargerData(
367-
component_id=component_id,
368-
current=sample,
369-
state=state,
370-
)
371-
)
372209

373210
@property
374211
def _system_power_bounds(self) -> ReceiverFetcher[SystemBounds]:

src/frequenz/sdk/timeseries/ev_charger_pool/_state_tracker.py

Lines changed: 0 additions & 136 deletions
This file was deleted.

0 commit comments

Comments
 (0)