diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 80e5f0cb..36a5616e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,11 +6,17 @@ ## Upgrading - +* When receiving streaming data for components, you now need to handle the receiving of the types `StreamStarted`, `StreamRetrying`, `StreamFatalError`. If you don't care about the new events and just want the old behavior your can always use `filter_stream_events` to ignore them, for example: + + ```python + from frequenz.client.base.streaming import filter_stream_events + + meter_rx = filter_stream_events(await client.meter_data()) + ``` ## New Features - +* Using the latest streaming client, when using `stream_sensor_data()` you will now get stream notification events, such as `StreamStarted`, `StreamRetrying` and `StreamFatalError`, which can be used to monitor the state of the stream. ## Bug Fixes diff --git a/pyproject.toml b/pyproject.toml index 2a0290e6..11e0ebbd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,10 +37,11 @@ classifiers = [ requires-python = ">= 3.11, < 4" dependencies = [ "frequenz-api-microgrid >= 0.15.3, < 0.16.0", - "frequenz-channels >= 1.0.0-rc1, < 2.0.0", - "frequenz-client-base >= 0.8.0, < 0.12.0", + "frequenz-channels >= 1.6.1, < 2.0.0", + #"frequenz-client-base >= 0.11.0, < 0.12.0", + "frequenz-client-base @ git+https://github.com/frequenz-floss/frequenz-client-base-python@refs/pull/153/head", "grpcio >= 1.59.0, < 2", - "protobuf >= 4.21.6, < 7", + "protobuf >= 5.29.2, < 7", "timezonefinder >= 6.2.0, < 7", "typing-extensions >= 4.6.0, < 5", ] diff --git a/src/frequenz/client/microgrid/_client.py b/src/frequenz/client/microgrid/_client.py index f1cb65c7..7c7ae10c 100644 --- a/src/frequenz/client/microgrid/_client.py +++ b/src/frequenz/client/microgrid/_client.py @@ -320,7 +320,7 @@ async def _new_component_data_receiver( expected_category: ComponentCategory, transform: Callable[[microgrid_pb2.ComponentData], _ComponentDataT], maxsize: int, - ) -> Receiver[_ComponentDataT]: + ) -> Receiver[_ComponentDataT | streaming.StreamEvent]: """Return a new broadcaster receiver for a given `component_id`. If a broadcaster for the given `component_id` doesn't exist, it creates a new @@ -393,7 +393,7 @@ async def meter_data( # noqa: DOC502 (ValueError is raised indirectly by _expec self, component_id: ComponentId, maxsize: int = RECEIVER_MAX_SIZE, - ) -> Receiver[MeterData]: + ) -> Receiver[MeterData | streaming.StreamEvent]: """Return a channel receiver that provides a `MeterData` stream. Raises: @@ -417,7 +417,7 @@ async def battery_data( # noqa: DOC502 (ValueError is raised indirectly by _exp self, component_id: ComponentId, maxsize: int = RECEIVER_MAX_SIZE, - ) -> Receiver[BatteryData]: + ) -> Receiver[BatteryData | streaming.StreamEvent]: """Return a channel receiver that provides a `BatteryData` stream. Raises: @@ -441,7 +441,7 @@ async def inverter_data( # noqa: DOC502 (ValueError is raised indirectly by _ex self, component_id: ComponentId, maxsize: int = RECEIVER_MAX_SIZE, - ) -> Receiver[InverterData]: + ) -> Receiver[InverterData | streaming.StreamEvent]: """Return a channel receiver that provides an `InverterData` stream. Raises: @@ -465,7 +465,7 @@ async def ev_charger_data( # noqa: DOC502 (ValueError is raised indirectly by _ self, component_id: ComponentId, maxsize: int = RECEIVER_MAX_SIZE, - ) -> Receiver[EVChargerData]: + ) -> Receiver[EVChargerData | streaming.StreamEvent]: """Return a channel receiver that provides an `EvChargeData` stream. Raises: @@ -591,7 +591,7 @@ def stream_sensor_data( metrics: Iterable[SensorMetric | int] | None = None, *, buffer_size: int = 50, - ) -> Receiver[SensorDataSamples]: + ) -> Receiver[SensorDataSamples | streaming.StreamEvent]: """Stream data samples from a sensor. Warning: diff --git a/tests/test_client.py b/tests/test_client.py index e3aff688..8c269552 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -17,6 +17,7 @@ from frequenz.api.common import components_pb2, metrics_pb2 from frequenz.api.microgrid import grid_pb2, inverter_pb2, microgrid_pb2, sensor_pb2 from frequenz.client.base import conversion, retry +from frequenz.client.base.streaming import StreamRetrying, StreamStarted from google.protobuf.empty_pb2 import Empty from frequenz.client.microgrid import ( @@ -688,6 +689,7 @@ async def stream_data( client.mock_stub.StreamComponentData.side_effect = stream_data receiver = await getattr(client, method)(component_id) + assert isinstance(await receiver.receive(), StreamStarted) latest = await receiver.receive() assert isinstance(latest, component_class) assert latest.component_id == component_id @@ -736,14 +738,28 @@ async def stream_data( client.mock_stub.StreamComponentData.side_effect = stream_data receiver = await getattr(client, method)(component_id) + assert isinstance(await receiver.receive(), StreamStarted) + assert isinstance(await receiver.receive(), StreamRetrying) + assert isinstance(await receiver.receive(), StreamStarted) + latest = await receiver.receive() assert isinstance(latest, component_class) assert latest.component_id == component_id + assert isinstance(await receiver.receive(), StreamRetrying) + assert isinstance(await receiver.receive(), StreamStarted) + assert isinstance(await receiver.receive(), StreamRetrying) + assert isinstance(await receiver.receive(), StreamStarted) + latest = await receiver.receive() assert isinstance(latest, component_class) assert latest.component_id == component_id + assert isinstance(await receiver.receive(), StreamRetrying) + assert isinstance(await receiver.receive(), StreamStarted) + assert isinstance(await receiver.receive(), StreamRetrying) + assert isinstance(await receiver.receive(), StreamStarted) + latest = await receiver.receive() assert isinstance(latest, component_class) assert latest.component_id == component_id @@ -947,6 +963,8 @@ async def stream_data_impl( receiver = client.stream_sensor_data( SensorId(sensor201.id), [SensorMetric.TEMPERATURE] ) + + assert isinstance(await receiver.receive(), StreamStarted) sample = await receiver.receive() assert isinstance(sample, SensorDataSamples) @@ -997,6 +1015,8 @@ async def stream_data_impl( client.mock_stub.StreamComponentData.side_effect = stream_data_impl receiver = client.stream_sensor_data(SensorId(sensor201.id)) + + assert isinstance(await receiver.receive(), StreamStarted) sample = await receiver.receive() assert isinstance(sample, SensorDataSamples) @@ -1046,7 +1066,12 @@ async def stream_data_error_impl( receiver = client.stream_sensor_data( SensorId(sensor201.id), [SensorMetric.TEMPERATURE] ) - sample = await receiver.receive() # Should succeed after retries + assert isinstance(await receiver.receive(), StreamStarted) + assert isinstance(await receiver.receive(), StreamRetrying) + assert isinstance(await receiver.receive(), StreamStarted) + assert isinstance(await receiver.receive(), StreamRetrying) + assert isinstance(await receiver.receive(), StreamStarted) + sample = await receiver.receive() # Get the actual sample assert isinstance(sample, SensorDataSamples) assert int(sample.sensor_id) == sensor201.id