Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/frequenz/client/dispatch/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,7 @@ def _get_stream(
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
"""Get an instance to the streaming helper."""
broadcaster = self._streams.get(microgrid_id)
# pylint: disable=protected-access
if broadcaster is not None and broadcaster._channel.is_closed:
# pylint: enable=protected-access
if broadcaster is not None and not broadcaster.is_running:
del self._streams[microgrid_id]
broadcaster = None
if broadcaster is None:
Expand Down
10 changes: 9 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from functools import partial

import grpc
import pytest
from pytest import raises

from frequenz.client.dispatch.test.client import FakeClient, to_create_params
Expand Down Expand Up @@ -302,13 +303,20 @@ async def test_delete_dispatch_fail(client: FakeClient) -> None:
await client.delete(microgrid_id=1, dispatch_id=1)


async def test_dispatch_stream(client: FakeClient, sample: Dispatch) -> None:
@pytest.mark.parametrize("call_twice", [True, False])
async def test_dispatch_stream(
client: FakeClient, sample: Dispatch, call_twice: bool
) -> None:
"""Test dispatching a stream of dispatches."""
microgrid_id = random.randint(1, 100)
dispatches = [sample, sample, sample]

stream = client.stream(microgrid_id)

if call_twice:
# Call function again to test behavior if stream already exists
_ = client.stream(microgrid_id)

async def expect(dispatch: Dispatch, event: Event) -> None:
message = await stream.receive()
assert message.dispatch == dispatch
Expand Down
Loading