|
17 | 17 | ConnectionFilter, |
18 | 18 | ConnectionList, |
19 | 19 | PowerLevelParam, |
| 20 | + SetBoundsParam, |
20 | 21 | ) |
21 | 22 | from google.protobuf.empty_pb2 import Empty |
| 23 | +from google.protobuf.timestamp_pb2 import Timestamp |
22 | 24 |
|
23 | 25 | # pylint: enable=no-name-in-module |
24 | 26 | from pytest_mock import MockerFixture |
@@ -119,3 +121,31 @@ def mock_set_power( |
119 | 121 | assert err_ctx.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED |
120 | 122 |
|
121 | 123 | assert await server.graceful_shutdown() |
| 124 | + |
| 125 | + |
| 126 | +async def test_set_bounds_timeout(mocker: MockerFixture) -> None: |
| 127 | + """Test if the set_power() method properly raises a timeout AioRpcError.""" |
| 128 | + servicer = MockMicrogridServicer() |
| 129 | + |
| 130 | + def mock_add_inclusion_bounds( |
| 131 | + request: SetBoundsParam, context: Any # pylint: disable=unused-argument |
| 132 | + ) -> Timestamp: |
| 133 | + time.sleep(GRPC_SERVER_DELAY) |
| 134 | + return Timestamp() |
| 135 | + |
| 136 | + mocker.patch.object(servicer, "AddInclusionBounds", mock_add_inclusion_bounds) |
| 137 | + server = MockGrpcServer(servicer, port=57809) |
| 138 | + await server.start() |
| 139 | + |
| 140 | + target = "[::]:57809" |
| 141 | + grpc_channel = grpc.aio.insecure_channel(target) |
| 142 | + client = ApiClient(grpc_channel=grpc_channel, target=target) |
| 143 | + |
| 144 | + bounds_values = [{"lower": 0.0, "upper": 100.0}, {"lower": -10.0, "upper": 1.0}] |
| 145 | + |
| 146 | + for bounds in bounds_values: |
| 147 | + with pytest.raises(grpc.aio.AioRpcError) as err_ctx: |
| 148 | + await client.set_bounds(component_id=1, **bounds) |
| 149 | + assert err_ctx.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED |
| 150 | + |
| 151 | + assert await server.graceful_shutdown() |
0 commit comments