Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
* When receiving streaming data for components, you now need to handle the receiving of the types `StreamStarted`, `StreamRetrying`, `StreamFatalError`. If you don't care about the new events and just want the old behavior your can always use `filter_stream_events` to ignore them, for example:

```python
from frequenz.client.base.streaming import filter_stream_events

meter_rx = filter_stream_events(await client.meter_data())
```

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* Using the latest streaming client, when using `stream_sensor_data()` you will now get stream notification events, such as `StreamStarted`, `StreamRetrying` and `StreamFatalError`, which can be used to monitor the state of the stream.

## Bug Fixes

Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ classifiers = [
requires-python = ">= 3.11, < 4"
dependencies = [
"frequenz-api-microgrid >= 0.15.3, < 0.16.0",
"frequenz-channels >= 1.0.0-rc1, < 2.0.0",
"frequenz-client-base >= 0.8.0, < 0.12.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
#"frequenz-client-base >= 0.11.0, < 0.12.0",
"frequenz-client-base @ git+https://github.com/frequenz-floss/frequenz-client-base-python@refs/pull/153/head",
"grpcio >= 1.59.0, < 2",
"protobuf >= 4.21.6, < 7",
"protobuf >= 5.29.2, < 7",
"timezonefinder >= 6.2.0, < 7",
"typing-extensions >= 4.6.0, < 5",
]
Expand Down
12 changes: 6 additions & 6 deletions src/frequenz/client/microgrid/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async def _new_component_data_receiver(
expected_category: ComponentCategory,
transform: Callable[[microgrid_pb2.ComponentData], _ComponentDataT],
maxsize: int,
) -> Receiver[_ComponentDataT]:
) -> Receiver[_ComponentDataT | streaming.StreamEvent]:
"""Return a new broadcaster receiver for a given `component_id`.

If a broadcaster for the given `component_id` doesn't exist, it creates a new
Expand Down Expand Up @@ -393,7 +393,7 @@ async def meter_data( # noqa: DOC502 (ValueError is raised indirectly by _expec
self,
component_id: ComponentId,
maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[MeterData]:
) -> Receiver[MeterData | streaming.StreamEvent]:
"""Return a channel receiver that provides a `MeterData` stream.

Raises:
Expand All @@ -417,7 +417,7 @@ async def battery_data( # noqa: DOC502 (ValueError is raised indirectly by _exp
self,
component_id: ComponentId,
maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[BatteryData]:
) -> Receiver[BatteryData | streaming.StreamEvent]:
"""Return a channel receiver that provides a `BatteryData` stream.

Raises:
Expand All @@ -441,7 +441,7 @@ async def inverter_data( # noqa: DOC502 (ValueError is raised indirectly by _ex
self,
component_id: ComponentId,
maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[InverterData]:
) -> Receiver[InverterData | streaming.StreamEvent]:
"""Return a channel receiver that provides an `InverterData` stream.

Raises:
Expand All @@ -465,7 +465,7 @@ async def ev_charger_data( # noqa: DOC502 (ValueError is raised indirectly by _
self,
component_id: ComponentId,
maxsize: int = RECEIVER_MAX_SIZE,
) -> Receiver[EVChargerData]:
) -> Receiver[EVChargerData | streaming.StreamEvent]:
"""Return a channel receiver that provides an `EvChargeData` stream.

Raises:
Expand Down Expand Up @@ -591,7 +591,7 @@ def stream_sensor_data(
metrics: Iterable[SensorMetric | int] | None = None,
*,
buffer_size: int = 50,
) -> Receiver[SensorDataSamples]:
) -> Receiver[SensorDataSamples | streaming.StreamEvent]:
"""Stream data samples from a sensor.

Warning:
Expand Down
27 changes: 26 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from frequenz.api.common import components_pb2, metrics_pb2
from frequenz.api.microgrid import grid_pb2, inverter_pb2, microgrid_pb2, sensor_pb2
from frequenz.client.base import conversion, retry
from frequenz.client.base.streaming import StreamRetrying, StreamStarted
from google.protobuf.empty_pb2 import Empty

from frequenz.client.microgrid import (
Expand Down Expand Up @@ -688,6 +689,7 @@ async def stream_data(

client.mock_stub.StreamComponentData.side_effect = stream_data
receiver = await getattr(client, method)(component_id)
assert isinstance(await receiver.receive(), StreamStarted)
latest = await receiver.receive()
assert isinstance(latest, component_class)
assert latest.component_id == component_id
Expand Down Expand Up @@ -736,14 +738,28 @@ async def stream_data(

client.mock_stub.StreamComponentData.side_effect = stream_data
receiver = await getattr(client, method)(component_id)
assert isinstance(await receiver.receive(), StreamStarted)
assert isinstance(await receiver.receive(), StreamRetrying)
assert isinstance(await receiver.receive(), StreamStarted)

latest = await receiver.receive()
assert isinstance(latest, component_class)
assert latest.component_id == component_id

assert isinstance(await receiver.receive(), StreamRetrying)
assert isinstance(await receiver.receive(), StreamStarted)
assert isinstance(await receiver.receive(), StreamRetrying)
assert isinstance(await receiver.receive(), StreamStarted)

latest = await receiver.receive()
assert isinstance(latest, component_class)
assert latest.component_id == component_id

assert isinstance(await receiver.receive(), StreamRetrying)
assert isinstance(await receiver.receive(), StreamStarted)
assert isinstance(await receiver.receive(), StreamRetrying)
assert isinstance(await receiver.receive(), StreamStarted)

latest = await receiver.receive()
assert isinstance(latest, component_class)
assert latest.component_id == component_id
Expand Down Expand Up @@ -947,6 +963,8 @@ async def stream_data_impl(
receiver = client.stream_sensor_data(
SensorId(sensor201.id), [SensorMetric.TEMPERATURE]
)

assert isinstance(await receiver.receive(), StreamStarted)
sample = await receiver.receive()

assert isinstance(sample, SensorDataSamples)
Expand Down Expand Up @@ -997,6 +1015,8 @@ async def stream_data_impl(

client.mock_stub.StreamComponentData.side_effect = stream_data_impl
receiver = client.stream_sensor_data(SensorId(sensor201.id))

assert isinstance(await receiver.receive(), StreamStarted)
sample = await receiver.receive()

assert isinstance(sample, SensorDataSamples)
Expand Down Expand Up @@ -1046,7 +1066,12 @@ async def stream_data_error_impl(
receiver = client.stream_sensor_data(
SensorId(sensor201.id), [SensorMetric.TEMPERATURE]
)
sample = await receiver.receive() # Should succeed after retries
assert isinstance(await receiver.receive(), StreamStarted)
assert isinstance(await receiver.receive(), StreamRetrying)
assert isinstance(await receiver.receive(), StreamStarted)
assert isinstance(await receiver.receive(), StreamRetrying)
assert isinstance(await receiver.receive(), StreamStarted)
sample = await receiver.receive() # Get the actual sample

assert isinstance(sample, SensorDataSamples)
assert int(sample.sensor_id) == sensor201.id
Expand Down
Loading