diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index afb4fd9b..09460d26 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,6 +11,9 @@ ## New Features * `dispatch-cli` supports now the parameter `--type` and `--running` to filter the list of running services by type and status, respectively. +* Every call now has a default timeout of 60 seconds, streams terminate after five minutes. This can be influenced by the two new parameters for`DispatchApiClient.__init__()`: + * `default_timeout: timedelta` (default: 60 seconds) + * `stream_timeout: timedelta` (default: 5 minutes) ## Bug Fixes diff --git a/src/frequenz/client/dispatch/_client.py b/src/frequenz/client/dispatch/_client.py index 34603f70..356189ad 100644 --- a/src/frequenz/client/dispatch/_client.py +++ b/src/frequenz/client/dispatch/_client.py @@ -53,12 +53,15 @@ class DispatchApiClient(BaseApiClient[dispatch_pb2_grpc.MicrogridDispatchServiceStub]): """Dispatch API client.""" + # pylint: disable-next=too-many-arguments def __init__( self, *, server_url: str, key: str, connect: bool = True, + call_timeout: timedelta = timedelta(seconds=60), + stream_timeout: timedelta = timedelta(minutes=5), ) -> None: """Initialize the client. @@ -66,6 +69,8 @@ def __init__( server_url: The URL of the server to connect to. key: API key to use for authentication. connect: Whether to connect to the service immediately. + call_timeout: Timeout for gRPC calls, default is 60 seconds. + stream_timeout: Timeout for gRPC streams, default is 5 minutes. """ super().__init__( server_url, @@ -82,6 +87,19 @@ def __init__( ] = {} """A dictionary of streamers, keyed by microgrid_id.""" + self._call_timeout_seconds = call_timeout.total_seconds() + self._stream_timeout_seconds = stream_timeout.total_seconds() + + @property + def call_timeout(self) -> timedelta: + """Get the call timeout.""" + return timedelta(seconds=self._call_timeout_seconds) + + @property + def stream_timeout(self) -> timedelta: + """Get the stream timeout.""" + return timedelta(seconds=self._stream_timeout_seconds) + @property def stub(self) -> dispatch_pb2_grpc.MicrogridDispatchServiceAsyncStub: """The stub for the service.""" @@ -177,7 +195,9 @@ def to_interval( while True: response = await cast( Awaitable[ListMicrogridDispatchesResponse], - self.stub.ListMicrogridDispatches(request, metadata=self._metadata), + self.stub.ListMicrogridDispatches( + request, metadata=self._metadata, timeout=self._call_timeout_seconds + ), ) yield (Dispatch.from_protobuf(dispatch) for dispatch in response.dispatches) @@ -234,7 +254,9 @@ def _get_stream( stream_method=lambda: cast( AsyncIterator[StreamMicrogridDispatchesResponse], self.stub.StreamMicrogridDispatches( - request, metadata=self._metadata + request, + metadata=self._metadata, + timeout=self._stream_timeout_seconds, ), ), transform=DispatchEvent.from_protobuf, @@ -303,7 +325,9 @@ async def create( # pylint: disable=too-many-positional-arguments response = await cast( Awaitable[CreateMicrogridDispatchResponse], self.stub.CreateMicrogridDispatch( - request.to_protobuf(), metadata=self._metadata + request.to_protobuf(), + metadata=self._metadata, + timeout=self._call_timeout_seconds, ), ) @@ -394,7 +418,9 @@ async def update( response = await cast( Awaitable[UpdateMicrogridDispatchResponse], - self.stub.UpdateMicrogridDispatch(msg, metadata=self._metadata), + self.stub.UpdateMicrogridDispatch( + msg, metadata=self._metadata, timeout=self._call_timeout_seconds + ), ) return Dispatch.from_protobuf(response.dispatch) @@ -414,7 +440,9 @@ async def get(self, *, microgrid_id: int, dispatch_id: int) -> Dispatch: ) response = await cast( Awaitable[GetMicrogridDispatchResponse], - self.stub.GetMicrogridDispatch(request, metadata=self._metadata), + self.stub.GetMicrogridDispatch( + request, metadata=self._metadata, timeout=self._call_timeout_seconds + ), ) return Dispatch.from_protobuf(response.dispatch) @@ -430,5 +458,7 @@ async def delete(self, *, microgrid_id: int, dispatch_id: int) -> None: ) await cast( Awaitable[None], - self.stub.DeleteMicrogridDispatch(request, metadata=self._metadata), + self.stub.DeleteMicrogridDispatch( + request, metadata=self._metadata, timeout=self._call_timeout_seconds + ), ) diff --git a/src/frequenz/client/dispatch/test/_service.py b/src/frequenz/client/dispatch/test/_service.py index 27e8fbda..06146dd9 100644 --- a/src/frequenz/client/dispatch/test/_service.py +++ b/src/frequenz/client/dispatch/test/_service.py @@ -112,13 +112,17 @@ def _check_access(self, metadata: grpc.aio.Metadata) -> None: # pylint: disable=invalid-name async def ListMicrogridDispatches( - self, request: ListMicrogridDispatchesRequest, metadata: grpc.aio.Metadata + self, + request: ListMicrogridDispatchesRequest, + metadata: grpc.aio.Metadata, + timeout: int = 5, # pylint: disable=unused-argument ) -> ListMicrogridDispatchesResponse: """List microgrid dispatches. Args: request: The request. metadata: The metadata. + timeout: timeout for the request, ignored in this mock. Returns: The dispatch list. @@ -141,13 +145,17 @@ async def ListMicrogridDispatches( ) async def StreamMicrogridDispatches( - self, request: StreamMicrogridDispatchesRequest, metadata: grpc.aio.Metadata + self, + request: StreamMicrogridDispatchesRequest, + metadata: grpc.aio.Metadata, + timeout: int = 5, # pylint: disable=unused-argument ) -> AsyncIterator[StreamMicrogridDispatchesResponse]: """Stream microgrid dispatches changes. Args: request: The request. metadata: The metadata. + timeout: timeout for the request, ignored in this mock. Returns: An async generator for dispatch changes. @@ -212,6 +220,7 @@ async def CreateMicrogridDispatch( self, request: PBDispatchCreateRequest, metadata: grpc.aio.Metadata, + timeout: int = 5, # pylint: disable=unused-argument ) -> CreateMicrogridDispatchResponse: """Create a new dispatch.""" self._check_access(metadata) @@ -240,6 +249,7 @@ async def UpdateMicrogridDispatch( self, request: UpdateMicrogridDispatchRequest, metadata: grpc.aio.Metadata, + timeout: int = 5, # pylint: disable=unused-argument ) -> UpdateMicrogridDispatchResponse: """Update a dispatch.""" self._check_access(metadata) @@ -327,6 +337,7 @@ async def GetMicrogridDispatch( self, request: GetMicrogridDispatchRequest, metadata: grpc.aio.Metadata, + timeout: int = 5, # pylint: disable=unused-argument ) -> GetMicrogridDispatchResponse: """Get a single dispatch.""" self._check_access(metadata) @@ -349,6 +360,7 @@ async def DeleteMicrogridDispatch( self, request: DeleteMicrogridDispatchRequest, metadata: grpc.aio.Metadata, + timeout: int = 5, # pylint: disable=unused-argument ) -> Empty: """Delete a given dispatch.""" self._check_access(metadata)