diff --git a/src/frequenz/client/dispatch/_client.py b/src/frequenz/client/dispatch/_client.py index ab6fd8af..ed6cc0ed 100644 --- a/src/frequenz/client/dispatch/_client.py +++ b/src/frequenz/client/dispatch/_client.py @@ -242,9 +242,7 @@ def _get_stream( ) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]: """Get an instance to the streaming helper.""" broadcaster = self._streams.get(microgrid_id) - # pylint: disable=protected-access - if broadcaster is not None and broadcaster._channel.is_closed: - # pylint: enable=protected-access + if broadcaster is not None and not broadcaster.is_running: del self._streams[microgrid_id] broadcaster = None if broadcaster is None: diff --git a/tests/test_client.py b/tests/test_client.py index 8ee16eab..df80ca1a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -10,6 +10,7 @@ from functools import partial import grpc +import pytest from pytest import raises from frequenz.client.dispatch.test.client import FakeClient, to_create_params @@ -302,13 +303,20 @@ async def test_delete_dispatch_fail(client: FakeClient) -> None: await client.delete(microgrid_id=1, dispatch_id=1) -async def test_dispatch_stream(client: FakeClient, sample: Dispatch) -> None: +@pytest.mark.parametrize("call_twice", [True, False]) +async def test_dispatch_stream( + client: FakeClient, sample: Dispatch, call_twice: bool +) -> None: """Test dispatching a stream of dispatches.""" microgrid_id = random.randint(1, 100) dispatches = [sample, sample, sample] stream = client.stream(microgrid_id) + if call_twice: + # Call function again to test behavior if stream already exists + _ = client.stream(microgrid_id) + async def expect(dispatch: Dispatch, event: Event) -> None: message = await stream.receive() assert message.dispatch == dispatch