Skip to content

Commit a95bc8f

Browse files
Add component_data method to EVChargerPool (#266)
This PR: 1. introduces the `component_data` method to the EVChargerPool, which ties the resampled 3-phase current values for an ev charger with the component state, which comes directly from the microgrid, streaming objects of type: ``` python @DataClass(frozen=True) class EVChargerData: """Data for an EV Charger, including the 3-phase current and the component state.""" component_id: int current: Sample3Phase state: EVChargerState ``` This is done so that the state-tracking code can piggy-back on the resampler to detect if a component is missing data, and switch its state to `MISSING` whenever that is the case. 3. adds these methods for `Sample3Phase`: `__iter__, min, max, map`.
2 parents 08d08fd + 8f5b067 commit a95bc8f

File tree

6 files changed

+359
-136
lines changed

6 files changed

+359
-136
lines changed

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33

44
"""Timeseries basic types."""
55

6+
from __future__ import annotations
7+
8+
import functools
69
from dataclasses import dataclass, field
710
from datetime import datetime
8-
from typing import Optional
11+
from typing import Callable, Iterator, Optional, overload
912

1013

1114
# Ordering by timestamp is a bit arbitrary, and it is not always what might be
@@ -48,3 +51,85 @@ class Sample3Phase:
4851

4952
value_p3: Optional[float]
5053
"""The value of the 3rd phase in this sample."""
54+
55+
def __iter__(self) -> Iterator[float | None]:
56+
"""Return an iterator that yields values from each of the phases.
57+
58+
Yields:
59+
Per-phase measurements one-by-one.
60+
"""
61+
yield self.value_p1
62+
yield self.value_p2
63+
yield self.value_p3
64+
65+
@overload
66+
def max(self, default: float) -> float:
67+
...
68+
69+
@overload
70+
def max(self, default: None = None) -> float | None:
71+
...
72+
73+
def max(self, default: float | None = None) -> float | None:
74+
"""Return the max value among all phases, or default if they are all `None`.
75+
76+
Args:
77+
default: value to return if all phases are `None`.
78+
79+
Returns:
80+
Max value among all phases, if available, default value otherwise.
81+
"""
82+
if not any(self):
83+
return default
84+
value: float = functools.reduce(
85+
lambda x, y: x if x > y else y,
86+
filter(None, self),
87+
)
88+
return value
89+
90+
@overload
91+
def min(self, default: float) -> float:
92+
...
93+
94+
@overload
95+
def min(self, default: None = None) -> float | None:
96+
...
97+
98+
def min(self, default: float | None = None) -> float | None:
99+
"""Return the min value among all phases, or default if they are all `None`.
100+
101+
Args:
102+
default: value to return if all phases are `None`.
103+
104+
Returns:
105+
Min value among all phases, if available, default value otherwise.
106+
"""
107+
if not any(self):
108+
return default
109+
value: float = functools.reduce(
110+
lambda x, y: x if x < y else y,
111+
filter(None, self),
112+
)
113+
return value
114+
115+
def map(
116+
self, function: Callable[[float], float], default: float | None = None
117+
) -> Sample3Phase:
118+
"""Apply the given function on each of the phase values and return the result.
119+
120+
If a phase value is `None`, replace it with `default` instead.
121+
122+
Args:
123+
function: The function to apply on each of the phase values.
124+
default: The value to apply if a phase value is `None`.
125+
126+
Returns:
127+
A new `Sample3Phase` instance, with the given function applied on values
128+
for each of the phases.
129+
"""
130+
return Sample3Phase(
131+
timestamp=self.timestamp,
132+
value_p1=default if self.value_p1 is None else function(self.value_p1),
133+
value_p2=default if self.value_p2 is None else function(self.value_p2),
134+
value_p3=default if self.value_p3 is None else function(self.value_p3),
135+
)

src/frequenz/sdk/timeseries/ev_charger_pool/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33

44
"""Interactions with EV Chargers."""
55

6-
from ._ev_charger_pool import EVChargerPool
6+
from ._ev_charger_pool import EVChargerData, EVChargerPool, EVChargerPoolError
7+
from ._state_tracker import EVChargerState
78

89
__all__ = [
910
"EVChargerPool",
11+
"EVChargerData",
12+
"EVChargerPoolError",
13+
"EVChargerState",
1014
]

src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py

Lines changed: 127 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,26 @@
55

66
from __future__ import annotations
77

8+
import asyncio
89
import logging
910
import uuid
11+
from asyncio import Task
12+
from collections import abc
13+
from dataclasses import dataclass
1014

11-
from frequenz.channels import Sender
15+
from frequenz.channels import Broadcast, ChannelClosedError, Receiver, Sender
1216

1317
from ...actor import ChannelRegistry, ComponentMetricRequest
1418
from ...microgrid import connection_manager
15-
from ...microgrid.component import ComponentCategory
19+
from ...microgrid.component import ComponentCategory, ComponentMetricId
20+
from .. import Sample, Sample3Phase
1621
from .._formula_engine import FormulaEnginePool, FormulaReceiver, FormulaReceiver3Phase
1722
from .._formula_engine._formula_generators import (
1823
EVChargerCurrentFormula,
1924
EVChargerPowerFormula,
2025
FormulaGeneratorConfig,
2126
)
27+
from ._state_tracker import EVChargerState, StateTracker
2228

2329
logger = logging.getLogger(__name__)
2430

@@ -27,6 +33,15 @@ class EVChargerPoolError(Exception):
2733
"""An error that occurred in any of the EVChargerPool methods."""
2834

2935

36+
@dataclass(frozen=True)
37+
class EVChargerData:
38+
"""Data for an EV Charger, including the 3-phase current and the component state."""
39+
40+
component_id: int
41+
current: Sample3Phase
42+
state: EVChargerState
43+
44+
3045
class EVChargerPool:
3146
"""Interactions with EV Chargers."""
3247

@@ -62,14 +77,27 @@ def __init__(
6277
component_category={ComponentCategory.EV_CHARGER}
6378
)
6479
}
80+
self._state_tracker: StateTracker | None = None
81+
self._status_streams: dict[
82+
int, tuple[Task[None], Broadcast[EVChargerData]]
83+
] = {}
6584
self._namespace: str = f"ev-charger-pool-{uuid.uuid4()}"
6685
self._formula_pool: FormulaEnginePool = FormulaEnginePool(
6786
self._namespace,
6887
self._channel_registry,
6988
self._resampler_subscription_sender,
7089
)
7190

72-
async def total_current(self) -> FormulaReceiver3Phase:
91+
@property
92+
def component_ids(self) -> abc.Set[int]:
93+
"""Return component IDs of all EV Chargers managed by this EVChargerPool.
94+
95+
Returns:
96+
Set of managed component IDs.
97+
"""
98+
return self._component_ids
99+
100+
async def current(self) -> FormulaReceiver3Phase:
73101
"""Fetch the total current for the EV Chargers in the pool.
74102
75103
If a formula engine to calculate EV Charger current is not already running, it
@@ -85,7 +113,7 @@ async def total_current(self) -> FormulaReceiver3Phase:
85113
FormulaGeneratorConfig(component_ids=self._component_ids),
86114
)
87115

88-
async def total_power(self) -> FormulaReceiver:
116+
async def power(self) -> FormulaReceiver:
89117
"""Fetch the total power for the EV Chargers in the pool.
90118
91119
If a formula engine to calculate EV Charger power is not already running, it
@@ -102,49 +130,114 @@ async def total_power(self) -> FormulaReceiver:
102130
FormulaGeneratorConfig(component_ids=self._component_ids),
103131
)
104132

105-
async def current(self, component_id: int) -> FormulaReceiver3Phase:
106-
"""Fetch the 3-phase current for the given EV Charger id.
133+
async def component_data(self, component_id: int) -> Receiver[EVChargerData]:
134+
"""Stream 3-phase current values and state of an EV Charger.
107135
108136
Args:
109-
component_id: id of the EV Charger to stream current values for.
137+
component_id: id of the EV Charger for which data is requested.
110138
111139
Returns:
112-
A *new* receiver that will stream 3-phase current values for the given
113-
EV Charger.
114-
115-
Raises:
116-
EVChargerPoolError: if the given component_id is not part of the pool.
140+
A receiver that streams objects containing 3-phase current and state of
141+
an EV Charger.
117142
"""
118-
if component_id not in self._component_ids:
119-
raise EVChargerPoolError(
120-
f"{component_id=} is not part of the EVChargerPool"
121-
f" (with ids={self._component_ids})"
143+
if recv := self._status_streams.get(component_id, None):
144+
task, output_chan = recv
145+
if not task.done():
146+
return output_chan.new_receiver()
147+
logger.warning("Restarting component_status for id: %s", component_id)
148+
else:
149+
output_chan = Broadcast[EVChargerData](
150+
f"evpool-component_status-{component_id}"
122151
)
123-
return await self._formula_pool.from_generator(
124-
f"ev_charger_current_{component_id}",
125-
EVChargerCurrentFormula,
126-
FormulaGeneratorConfig(component_ids={component_id}),
152+
153+
task = asyncio.create_task(
154+
self._stream_component_data(component_id, output_chan.new_sender())
127155
)
128156

129-
async def power(self, component_id: int) -> FormulaReceiver:
130-
"""Fetch the power for the given EV Charger id.
157+
self._status_streams[component_id] = (task, output_chan)
158+
159+
return output_chan.new_receiver()
160+
161+
async def _get_current_streams(
162+
self, component_id: int
163+
) -> tuple[Receiver[Sample], Receiver[Sample], Receiver[Sample]]:
164+
"""Fetch current streams from the resampler for each phase.
131165
132166
Args:
133-
component_id: id of the EV Charger to stream power values for.
167+
component_id: id of EV Charger for which current streams are being fetched.
134168
135169
Returns:
136-
A *new* receiver that will stream power values for the given EV Charger.
170+
A tuple of 3 receivers stream resampled current values for the given
171+
component id, one for each phase.
172+
"""
173+
174+
async def resampler_subscribe(metric_id: ComponentMetricId) -> Receiver[Sample]:
175+
request = ComponentMetricRequest(
176+
namespace="ev-pool",
177+
component_id=component_id,
178+
metric_id=metric_id,
179+
start_time=None,
180+
)
181+
await self._resampler_subscription_sender.send(request)
182+
return self._channel_registry.new_receiver(request.get_channel_name())
183+
184+
return (
185+
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_1),
186+
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_2),
187+
await resampler_subscribe(ComponentMetricId.CURRENT_PHASE_3),
188+
)
189+
190+
async def _stream_component_data(
191+
self,
192+
component_id: int,
193+
sender: Sender[EVChargerData],
194+
) -> None:
195+
"""Stream 3-phase current values and state of an EV Charger.
196+
197+
Args:
198+
component_id: id of the EV Charger for which data is requested.
199+
sender: A sender to stream EV Charger data to.
137200
138201
Raises:
139-
EVChargerPoolError: if the given component_id is not part of the pool.
202+
ChannelClosedError: If the channels from the resampler are closed.
140203
"""
141-
if component_id not in self._component_ids:
142-
raise EVChargerPoolError(
143-
f"{component_id=} is not part of the EVChargerPool"
144-
f" (with ids={self._component_ids})"
145-
)
146-
return await self._formula_pool.from_generator(
147-
f"ev_charger_current_{component_id}",
148-
EVChargerPowerFormula,
149-
FormulaGeneratorConfig(component_ids={component_id}),
204+
if not self._state_tracker:
205+
self._state_tracker = StateTracker(self._component_ids)
206+
207+
(phase_1_rx, phase_2_rx, phase_3_rx) = await self._get_current_streams(
208+
component_id
150209
)
210+
while True:
211+
try:
212+
(phase_1, phase_2, phase_3) = (
213+
await phase_1_rx.receive(),
214+
await phase_2_rx.receive(),
215+
await phase_3_rx.receive(),
216+
)
217+
except ChannelClosedError:
218+
logger.exception("Streams closed for component_id=%s.", component_id)
219+
raise
220+
221+
sample = Sample3Phase(
222+
timestamp=phase_1.timestamp,
223+
value_p1=phase_1.value,
224+
value_p2=phase_2.value,
225+
value_p3=phase_3.value,
226+
)
227+
228+
if (
229+
phase_1.value is None
230+
and phase_2.value is None
231+
and phase_3.value is None
232+
):
233+
state = EVChargerState.MISSING
234+
else:
235+
state = self._state_tracker.get(component_id)
236+
237+
await sender.send(
238+
EVChargerData(
239+
component_id=component_id,
240+
current=sample,
241+
state=state,
242+
)
243+
)

0 commit comments

Comments
 (0)