diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 57d8b38..54ec3d1 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,6 +3,7 @@ ## Features * Added support for HMAC signing of client messages +* Added support for streams to be restarted without their channel being closed ## Upgrading diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index a97b931..7634ca5 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -77,6 +77,23 @@ def new_receiver( limit=maxsize, warn_on_overflow=warn_on_overflow ) + def reconnect(self) -> None: + """Reconnect to the stream. + + This will cancel the current task and create a new one. + """ + if not self._task.done(): + _logger.warning( + "%s: reconnecting to the stream, cancelling the current task", + self._stream_name, + ) + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = asyncio.create_task(self._run()) + @property def is_running(self) -> bool: """Return whether the streaming helper is running.