Skip to content

Commit 8750c53

Browse files
committed
Use the GrpcStreamingHelper from client-base
The `GrpcStreamingHelper` keeps track of the task and the conversion, so we can remove a lot of code from the client. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 814b1a9 commit 8750c53

File tree

1 file changed

+51
-104
lines changed

1 file changed

+51
-104
lines changed

src/frequenz/client/microgrid/_client.py

Lines changed: 51 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
from frequenz.api.microgrid.microgrid_pb2_grpc import MicrogridStub
3030

3131
# pylint: enable=no-name-in-module
32-
from frequenz.channels import Broadcast, Receiver, Sender
32+
from frequenz.channels import Receiver
33+
from frequenz.client.base.grpc_streaming_helper import GrpcStreamingHelper
3334
from frequenz.client.base.retry_strategy import LinearBackoff, RetryStrategy
3435
from google.protobuf.empty_pb2 import Empty # pylint: disable=no-name-in-module
3536

@@ -84,8 +85,7 @@ def __init__(
8485
self.api = MicrogridStub(grpc_channel)
8586
"""The gRPC stub for the microgrid API."""
8687

87-
self._component_streams: dict[int, Broadcast[Any]] = {}
88-
self._streaming_tasks: dict[int, asyncio.Task[None]] = {}
88+
self._broadcasters: dict[int, GrpcStreamingHelper[Any, Any]] = {}
8989
self._retry_spec = retry_spec
9090

9191
async def components(self) -> Iterable[Component]:
@@ -234,94 +234,49 @@ async def connections(
234234

235235
return result
236236

237-
async def _component_data_task(
238-
self,
239-
component_id: int,
240-
transform: Callable[[PbComponentData], _ComponentDataT],
241-
sender: Sender[_ComponentDataT],
242-
) -> None:
243-
"""Read data from the microgrid API and send to a channel.
244-
245-
Args:
246-
component_id: id of the component to get data for.
247-
transform: A method for transforming raw component data into the
248-
desired output type.
249-
sender: A channel sender, to send the component data to.
250-
"""
251-
retry_spec: RetryStrategy = self._retry_spec.copy()
252-
while True:
253-
_logger.debug(
254-
"Making call to `GetComponentData`, for component_id=%d", component_id
255-
)
256-
try:
257-
call = self.api.StreamComponentData(
258-
PbComponentIdParam(id=component_id),
259-
)
260-
# grpc.aio is missing types and mypy thinks this is not
261-
# async iterable, but it is
262-
async for msg in call: # type: ignore[attr-defined]
263-
await sender.send(transform(msg))
264-
except grpc.aio.AioRpcError as err:
265-
api_details = f"Microgrid API: {self.target}."
266-
_logger.exception(
267-
"`GetComponentData`, for component_id=%d: exception: %s api: %s",
268-
component_id,
269-
err,
270-
api_details,
271-
)
272-
273-
if interval := retry_spec.next_interval():
274-
_logger.warning(
275-
"`GetComponentData`, for component_id=%d: connection ended, "
276-
"retrying %s in %0.3f seconds.",
277-
component_id,
278-
retry_spec.get_progress(),
279-
interval,
280-
)
281-
await asyncio.sleep(interval)
282-
else:
283-
_logger.warning(
284-
"`GetComponentData`, for component_id=%d: connection ended, "
285-
"retry limit exceeded %s.",
286-
component_id,
287-
retry_spec.get_progress(),
288-
)
289-
break
290-
291-
def _get_component_data_channel(
237+
async def _new_component_data_receiver(
292238
self,
239+
*,
293240
component_id: int,
241+
expected_category: ComponentCategory,
294242
transform: Callable[[PbComponentData], _ComponentDataT],
295-
) -> Broadcast[_ComponentDataT]:
296-
"""Return the broadcast channel for a given component_id.
243+
maxsize: int,
244+
) -> Receiver[_ComponentDataT]:
245+
"""Return a new broadcaster receiver for a given `component_id`.
297246
298-
If a broadcast channel for the given component_id doesn't exist, create
299-
a new channel and a task for reading data from the microgrid api and
300-
sending them to the channel.
247+
If a broadcaster for the given `component_id` doesn't exist, it creates a new
248+
one.
301249
302250
Args:
303251
component_id: id of the component to get data for.
252+
expected_category: Category of the component to get data for.
304253
transform: A method for transforming raw component data into the
305254
desired output type.
255+
maxsize: Size of the receiver's buffer.
306256
307257
Returns:
308-
The channel for the given component_id.
258+
The new receiver for the given `component_id`.
309259
"""
310-
if component_id in self._component_streams:
311-
return self._component_streams[component_id]
312-
task_name = f"raw-component-data-{component_id}"
313-
chan = Broadcast[_ComponentDataT](task_name, resend_latest=True)
314-
self._component_streams[component_id] = chan
315-
316-
self._streaming_tasks[component_id] = asyncio.create_task(
317-
self._component_data_task(
318-
component_id,
260+
await self._expect_category(
261+
component_id,
262+
expected_category,
263+
)
264+
265+
broadcaster = self._broadcasters.setdefault(
266+
component_id,
267+
GrpcStreamingHelper(
268+
f"raw-component-data-{component_id}",
269+
# We need to cast here because grpc says StreamComponentData is
270+
# a grpc.CallIterator[PbComponentData], not a
271+
# grpc.aio.UnaryStreamCall[..., PbComponentData].
272+
lambda: cast(
273+
grpc.aio.UnaryStreamCall[Any, PbComponentData],
274+
self.api.StreamComponentData(PbComponentIdParam(id=component_id)),
275+
),
319276
transform,
320-
chan.new_sender(),
321277
),
322-
name=task_name,
323278
)
324-
return chan
279+
return broadcaster.new_receiver(maxsize=maxsize)
325280

326281
async def _expect_category(
327282
self,
@@ -372,14 +327,12 @@ async def meter_data( # noqa: DOC502 (ValueError is raised indirectly by _expec
372327
Returns:
373328
A channel receiver that provides realtime meter data.
374329
"""
375-
await self._expect_category(
376-
component_id,
377-
ComponentCategory.METER,
330+
return await self._new_component_data_receiver(
331+
component_id=component_id,
332+
expected_category=ComponentCategory.METER,
333+
transform=MeterData.from_proto,
334+
maxsize=maxsize,
378335
)
379-
return self._get_component_data_channel(
380-
component_id,
381-
MeterData.from_proto,
382-
).new_receiver(maxsize=maxsize)
383336

384337
async def battery_data( # noqa: DOC502 (ValueError is raised indirectly by _expect_category)
385338
self,
@@ -398,14 +351,12 @@ async def battery_data( # noqa: DOC502 (ValueError is raised indirectly by _exp
398351
Returns:
399352
A channel receiver that provides realtime battery data.
400353
"""
401-
await self._expect_category(
402-
component_id,
403-
ComponentCategory.BATTERY,
354+
return await self._new_component_data_receiver(
355+
component_id=component_id,
356+
expected_category=ComponentCategory.BATTERY,
357+
transform=BatteryData.from_proto,
358+
maxsize=maxsize,
404359
)
405-
return self._get_component_data_channel(
406-
component_id,
407-
BatteryData.from_proto,
408-
).new_receiver(maxsize=maxsize)
409360

410361
async def inverter_data( # noqa: DOC502 (ValueError is raised indirectly by _expect_category)
411362
self,
@@ -424,14 +375,12 @@ async def inverter_data( # noqa: DOC502 (ValueError is raised indirectly by _ex
424375
Returns:
425376
A channel receiver that provides realtime inverter data.
426377
"""
427-
await self._expect_category(
428-
component_id,
429-
ComponentCategory.INVERTER,
378+
return await self._new_component_data_receiver(
379+
component_id=component_id,
380+
expected_category=ComponentCategory.INVERTER,
381+
transform=InverterData.from_proto,
382+
maxsize=maxsize,
430383
)
431-
return self._get_component_data_channel(
432-
component_id,
433-
InverterData.from_proto,
434-
).new_receiver(maxsize=maxsize)
435384

436385
async def ev_charger_data( # noqa: DOC502 (ValueError is raised indirectly by _expect_category)
437386
self,
@@ -450,14 +399,12 @@ async def ev_charger_data( # noqa: DOC502 (ValueError is raised indirectly by _
450399
Returns:
451400
A channel receiver that provides realtime ev charger data.
452401
"""
453-
await self._expect_category(
454-
component_id,
455-
ComponentCategory.EV_CHARGER,
402+
return await self._new_component_data_receiver(
403+
component_id=component_id,
404+
expected_category=ComponentCategory.EV_CHARGER,
405+
transform=EVChargerData.from_proto,
406+
maxsize=maxsize,
456407
)
457-
return self._get_component_data_channel(
458-
component_id,
459-
EVChargerData.from_proto,
460-
).new_receiver(maxsize=maxsize)
461408

462409
async def set_power(self, component_id: int, power_w: float) -> None:
463410
"""Send request to the Microgrid to set power for component.

0 commit comments

Comments
 (0)