Skip to content

Commit 38cbb82

Browse files
committed
Remove support for streaming per-component data from EVChargerPool
Also remove per-component status tracking support. The EVChargerPool will not expose individual EV chargers anymore and will instead provide a first-come-first-serve control algorithm through the PowerDistributingActor (which can be swapped when users want to provide a custom power distribution algorithm). Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 096b8b4 commit 38cbb82

File tree

4 files changed

+5
-479
lines changed

4 files changed

+5
-479
lines changed

src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33

44
"""Interactions with EV Chargers."""
55

6-
from ._ev_charger_pool import EVChargerData, EVChargerPool, EVChargerPoolError
6+
from ._ev_charger_pool import EVChargerPool, EVChargerPoolError
77
from ._set_current_bounds import ComponentCurrentLimit
8-
from ._state_tracker import EVChargerState
98

109
__all__ = [
1110
"ComponentCurrentLimit",
1211
"EVChargerPool",
13-
"EVChargerData",
1412
"EVChargerPoolError",
15-
"EVChargerState",
1613
]

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py

Lines changed: 3 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,16 @@
44
"""Interactions with pools of EV Chargers."""
55

66

7-
import asyncio
8-
import logging
97
import uuid
10-
from asyncio import Task
118
from collections import abc
12-
from dataclasses import dataclass
139
from datetime import timedelta
1410

15-
from frequenz.channels import Broadcast, ChannelClosedError, Receiver, Sender
16-
from frequenz.client.microgrid import ComponentCategory, ComponentMetricId
11+
from frequenz.channels import Sender
12+
from frequenz.client.microgrid import ComponentCategory
1713

18-
from ..._internal._asyncio import cancel_and_await
1914
from ...actor import ChannelRegistry, ComponentMetricRequest
2015
from ...microgrid import connection_manager
21-
from .. import Sample, Sample3Phase
22-
from .._quantities import Current, Power, Quantity
16+
from .._quantities import Current, Power
2317
from ..formula_engine import FormulaEngine, FormulaEngine3Phase
2418
from ..formula_engine._formula_engine_pool import FormulaEnginePool
2519
from ..formula_engine._formula_generators import (
@@ -28,29 +22,12 @@
2822
FormulaGeneratorConfig,
2923
)
3024
from ._set_current_bounds import BoundsSetter, ComponentCurrentLimit
31-
from ._state_tracker import EVChargerState, StateTracker
32-
33-
_logger = logging.getLogger(__name__)
3425

3526

3627
class EVChargerPoolError(Exception):
3728
"""An error that occurred in any of the EVChargerPool methods."""
3829

3930

40-
@dataclass(frozen=True)
41-
class EVChargerData:
42-
"""Data for an EV Charger, including the 3-phase current and the component state."""
43-
44-
component_id: int
45-
"""The component ID of the EV Charger."""
46-
47-
current: Sample3Phase[Current]
48-
"""The 3-phase current of the EV Charger."""
49-
50-
state: EVChargerState
51-
"""The state of the EV Charger."""
52-
53-
5431
class EVChargerPool:
5532
"""An interface for interaction with pools of EV Chargers.
5633
@@ -114,10 +91,6 @@ def __init__(
11491
component_categories={ComponentCategory.EV_CHARGER}
11592
)
11693
}
117-
self._state_tracker: StateTracker | None = None
118-
self._status_streams: dict[int, tuple[Task[None], Broadcast[EVChargerData]]] = (
119-
{}
120-
)
12194
self._namespace: str = f"ev-charger-pool-{uuid.uuid4()}"
12295
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
12396
self._namespace,
@@ -185,34 +158,6 @@ def power(self) -> FormulaEngine[Power]:
185158
assert isinstance(engine, FormulaEngine)
186159
return engine
187160

188-
def component_data(self, component_id: int) -> Receiver[EVChargerData]:
189-
"""Stream 3-phase current values and state of an EV Charger.
190-
191-
Args:
192-
component_id: id of the EV Charger for which data is requested.
193-
194-
Returns:
195-
A receiver that streams objects containing 3-phase current and state of
196-
an EV Charger.
197-
"""
198-
if recv := self._status_streams.get(component_id, None):
199-
task, output_chan = recv
200-
if not task.done():
201-
return output_chan.new_receiver()
202-
_logger.warning("Restarting component_status for id: %s", component_id)
203-
else:
204-
output_chan = Broadcast[EVChargerData](
205-
name=f"evpool-component_status-{component_id}"
206-
)
207-
208-
task = asyncio.create_task(
209-
self._stream_component_data(component_id, output_chan.new_sender())
210-
)
211-
212-
self._status_streams[component_id] = (task, output_chan)
213-
214-
return output_chan.new_receiver()
215-
216161
async def set_bounds(self, component_id: int, max_current: Current) -> None:
217162
"""Send given max current bound for the given EV Charger to the microgrid API.
218163
@@ -244,112 +189,4 @@ async def stop(self) -> None:
244189
"""Stop all tasks and channels owned by the EVChargerPool."""
245190
if self._bounds_setter:
246191
await self._bounds_setter.stop()
247-
if self._state_tracker:
248-
await self._state_tracker.stop()
249192
await self._formula_pool.stop()
250-
for stream in self._status_streams.values():
251-
task, chan = stream
252-
await chan.close()
253-
await cancel_and_await(task)
254-
255-
async def _get_current_streams(self, component_id: int) -> tuple[
256-
Receiver[Sample[Quantity]],
257-
Receiver[Sample[Quantity]],
258-
Receiver[Sample[Quantity]],
259-
]:
260-
"""Fetch current streams from the resampler for each phase.
261-
262-
Args:
263-
component_id: id of EV Charger for which current streams are being fetched.
264-
265-
Returns:
266-
A tuple of 3 receivers stream resampled current values for the given
267-
component id, one for each phase.
268-
"""
269-
270-
async def resampler_subscribe(
271-
metric_id: ComponentMetricId,
272-
) -> Receiver[Sample[Quantity]]:
273-
request = ComponentMetricRequest(
274-
namespace="ev-pool",
275-
component_id=component_id,
276-
metric_id=metric_id,
277-
start_time=None,
278-
)
279-
await self._resampler_subscription_sender.send(request)
280-
return self._channel_registry.get_or_create(
281-
Sample[Quantity], request.get_channel_name()
282-
).new_receiver()
283-
284-
return (
285-
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_1),
286-
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_2),
287-
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_3),
288-
)
289-
290-
async def _stream_component_data(
291-
self,
292-
component_id: int,
293-
sender: Sender[EVChargerData],
294-
) -> None:
295-
"""Stream 3-phase current values and state of an EV Charger.
296-
297-
Args:
298-
component_id: id of the EV Charger for which data is requested.
299-
sender: A sender to stream EV Charger data to.
300-
301-
Raises:
302-
ChannelClosedError: If the channels from the resampler are closed.
303-
"""
304-
if not self._state_tracker:
305-
self._state_tracker = StateTracker(self._component_ids)
306-
307-
(phase_1_rx, phase_2_rx, phase_3_rx) = await self._get_current_streams(
308-
component_id
309-
)
310-
while True:
311-
try:
312-
(phase_1, phase_2, phase_3) = (
313-
await phase_1_rx.receive(),
314-
await phase_2_rx.receive(),
315-
await phase_3_rx.receive(),
316-
)
317-
except ChannelClosedError:
318-
_logger.exception("Streams closed for component_id=%s.", component_id)
319-
raise
320-
321-
sample = Sample3Phase(
322-
timestamp=phase_1.timestamp,
323-
value_p1=(
324-
None
325-
if phase_1.value is None
326-
else Current.from_amperes(phase_1.value.base_value)
327-
),
328-
value_p2=(
329-
None
330-
if phase_2.value is None
331-
else Current.from_amperes(phase_2.value.base_value)
332-
),
333-
value_p3=(
334-
None
335-
if phase_3.value is None
336-
else Current.from_amperes(phase_3.value.base_value)
337-
),
338-
)
339-
340-
if (
341-
phase_1.value is None
342-
and phase_2.value is None
343-
and phase_3.value is None
344-
):
345-
state = EVChargerState.MISSING
346-
else:
347-
state = self._state_tracker.get(component_id)
348-
349-
await sender.send(
350-
EVChargerData(
351-
component_id=component_id,
352-
current=sample,
353-
state=state,
354-
)
355-
)

src/frequenz/sdk/timeseries/ev_charger_pool/_state_tracker.py

Lines changed: 0 additions & 135 deletions
This file was deleted.

0 commit comments

Comments
 (0)