Skip to content

Commit efa3cf5

Browse files
Process voltage phases dynamically
Apply suggestions from code review. Co-authored-by: Leandro Lucarella <[email protected]> Signed-off-by: daniel-zullo-frequenz <[email protected]>
1 parent 8f2194e commit efa3cf5

File tree

1 file changed

+26
-30
lines changed

1 file changed

+26
-30
lines changed

src/frequenz/sdk/timeseries/_voltage_streaming.py

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
from ..actor import ChannelRegistry
1919
from ..microgrid import connection_manager
2020
from ..microgrid.component import Component, ComponentCategory, ComponentMetricId
21-
from ..timeseries._base_types import Sample3Phase
22-
from ..timeseries._quantities import Voltage
21+
from ..timeseries._base_types import Sample, Sample3Phase
22+
from ..timeseries._quantities import Quantity, Voltage
2323

2424
if TYPE_CHECKING:
2525
# Imported here to avoid a circular import.
@@ -133,25 +133,22 @@ async def _send_request(self) -> None:
133133
ComponentMetricRequest,
134134
)
135135

136-
def _create_request(phase: ComponentMetricId) -> ComponentMetricRequest:
137-
return ComponentMetricRequest(
138-
self._namespace,
139-
self._source_component.component_id,
140-
phase,
141-
None,
136+
metric_ids = (
137+
ComponentMetricId.VOLTAGE_PHASE_1,
138+
ComponentMetricId.VOLTAGE_PHASE_2,
139+
ComponentMetricId.VOLTAGE_PHASE_3,
140+
)
141+
phases_rx: list[Receiver[Sample[Quantity]]] = []
142+
for metric_id in metric_ids:
143+
req = ComponentMetricRequest(
144+
self._namespace, self._source_component.component_id, metric_id, None
142145
)
143146

144-
phase_1_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_1)
145-
phase_2_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_2)
146-
phase_3_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_3)
147-
148-
await self._resampler_subscription_sender.send(phase_1_req)
149-
await self._resampler_subscription_sender.send(phase_2_req)
150-
await self._resampler_subscription_sender.send(phase_3_req)
147+
await self._resampler_subscription_sender.send(req)
151148

152-
phase_1_rx = self._channel_registry.new_receiver(phase_1_req.get_channel_name())
153-
phase_2_rx = self._channel_registry.new_receiver(phase_2_req.get_channel_name())
154-
phase_3_rx = self._channel_registry.new_receiver(phase_3_req.get_channel_name())
149+
phases_rx.append(
150+
self._channel_registry.new_receiver(req.get_channel_name())
151+
)
155152

156153
sender = self._channel_registry.new_sender(self._channel_key)
157154

@@ -161,25 +158,24 @@ def _create_request(phase: ComponentMetricId) -> ComponentMetricRequest:
161158

162159
while True:
163160
try:
164-
phase_1 = await phase_1_rx.receive()
165-
phase_2 = await phase_2_rx.receive()
166-
phase_3 = await phase_3_rx.receive()
161+
phases = [await r.receive() for r in phases_rx]
167162

168-
if phase_1 is None or phase_2 is None or phase_3 is None:
163+
if not all(map(lambda p: p is not None, phases)):
169164
_logger.warning(
170-
"Received None from voltage request: %s (%s, %s, %s)",
165+
"Received None from voltage request: %s %s",
171166
self._source_component,
172-
phase_1,
173-
phase_2,
174-
phase_3,
167+
phases,
175168
)
176169
continue
177170

178171
msg = Sample3Phase(
179-
phase_1.timestamp,
180-
Voltage.from_volts(phase_1.value.base_value),
181-
Voltage.from_volts(phase_2.value.base_value),
182-
Voltage.from_volts(phase_3.value.base_value),
172+
phases[0].timestamp,
173+
*map(
174+
lambda p: Voltage.from_volts(p.value.base_value)
175+
if p.value
176+
else None,
177+
phases,
178+
),
183179
)
184180
except asyncio.CancelledError:
185181
_logger.exception(

0 commit comments

Comments
 (0)