Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Features

* Added support for HMAC signing of client messages
* Added support for streams to be restarted without their channel being closed

## Upgrading

Expand Down
17 changes: 17 additions & 0 deletions src/frequenz/client/base/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ def new_receiver(
limit=maxsize, warn_on_overflow=warn_on_overflow
)

def reconnect(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this restart_stream() for extra clarity.

"""Reconnect to the stream.

This will cancel the current task and create a new one.
"""
if not self._task.done():
_logger.warning(
"%s: reconnecting to the stream, cancelling the current task",
self._stream_name,
)
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = asyncio.create_task(self._run())

@property
def is_running(self) -> bool:
"""Return whether the streaming helper is running.
Expand Down
Loading