|
17 | 17 | from frequenz.api.common import components_pb2, metrics_pb2 |
18 | 18 | from frequenz.api.microgrid import grid_pb2, inverter_pb2, microgrid_pb2, sensor_pb2 |
19 | 19 | from frequenz.client.base import conversion, retry |
| 20 | +from frequenz.client.base.streaming import StreamRetrying, StreamStarted |
20 | 21 | from google.protobuf.empty_pb2 import Empty |
21 | 22 |
|
22 | 23 | from frequenz.client.microgrid import ( |
@@ -688,6 +689,7 @@ async def stream_data( |
688 | 689 |
|
689 | 690 | client.mock_stub.StreamComponentData.side_effect = stream_data |
690 | 691 | receiver = await getattr(client, method)(component_id) |
| 692 | + assert isinstance(await receiver.receive(), StreamStarted) |
691 | 693 | latest = await receiver.receive() |
692 | 694 | assert isinstance(latest, component_class) |
693 | 695 | assert latest.component_id == component_id |
@@ -736,14 +738,28 @@ async def stream_data( |
736 | 738 |
|
737 | 739 | client.mock_stub.StreamComponentData.side_effect = stream_data |
738 | 740 | receiver = await getattr(client, method)(component_id) |
| 741 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 742 | + assert isinstance(await receiver.receive(), StreamRetrying) |
| 743 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 744 | + |
739 | 745 | latest = await receiver.receive() |
740 | 746 | assert isinstance(latest, component_class) |
741 | 747 | assert latest.component_id == component_id |
742 | 748 |
|
| 749 | + assert isinstance(await receiver.receive(), StreamRetrying) |
| 750 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 751 | + assert isinstance(await receiver.receive(), StreamRetrying) |
| 752 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 753 | + |
743 | 754 | latest = await receiver.receive() |
744 | 755 | assert isinstance(latest, component_class) |
745 | 756 | assert latest.component_id == component_id |
746 | 757 |
|
| 758 | + assert isinstance(await receiver.receive(), StreamRetrying) |
| 759 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 760 | + assert isinstance(await receiver.receive(), StreamRetrying) |
| 761 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 762 | + |
747 | 763 | latest = await receiver.receive() |
748 | 764 | assert isinstance(latest, component_class) |
749 | 765 | assert latest.component_id == component_id |
@@ -947,6 +963,8 @@ async def stream_data_impl( |
947 | 963 | receiver = client.stream_sensor_data( |
948 | 964 | SensorId(sensor201.id), [SensorMetric.TEMPERATURE] |
949 | 965 | ) |
| 966 | + |
| 967 | + assert isinstance(await receiver.receive(), StreamStarted) |
950 | 968 | sample = await receiver.receive() |
951 | 969 |
|
952 | 970 | assert isinstance(sample, SensorDataSamples) |
@@ -997,6 +1015,8 @@ async def stream_data_impl( |
997 | 1015 |
|
998 | 1016 | client.mock_stub.StreamComponentData.side_effect = stream_data_impl |
999 | 1017 | receiver = client.stream_sensor_data(SensorId(sensor201.id)) |
| 1018 | + |
| 1019 | + assert isinstance(await receiver.receive(), StreamStarted) |
1000 | 1020 | sample = await receiver.receive() |
1001 | 1021 |
|
1002 | 1022 | assert isinstance(sample, SensorDataSamples) |
@@ -1046,7 +1066,12 @@ async def stream_data_error_impl( |
1046 | 1066 | receiver = client.stream_sensor_data( |
1047 | 1067 | SensorId(sensor201.id), [SensorMetric.TEMPERATURE] |
1048 | 1068 | ) |
1049 | | - sample = await receiver.receive() # Should succeed after retries |
| 1069 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 1070 | + assert isinstance(await receiver.receive(), StreamRetrying) |
| 1071 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 1072 | + assert isinstance(await receiver.receive(), StreamRetrying) |
| 1073 | + assert isinstance(await receiver.receive(), StreamStarted) |
| 1074 | + sample = await receiver.receive() # Get the actual sample |
1050 | 1075 |
|
1051 | 1076 | assert isinstance(sample, SensorDataSamples) |
1052 | 1077 | assert int(sample.sensor_id) == sensor201.id |
|
0 commit comments