From a696d436e72176830d3f21ea4e3ea4c577f01bb1 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 21 May 2025 18:34:08 +0200 Subject: [PATCH 1/2] Add support to restart a stream Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 1 + src/frequenz/client/base/streaming.py | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 57d8b38..54ec3d1 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,6 +3,7 @@ ## Features * Added support for HMAC signing of client messages +* Added support for streams to be restarted without their channel being closed ## Upgrading diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index a97b931..af81a70 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -77,6 +77,19 @@ def new_receiver( limit=maxsize, warn_on_overflow=warn_on_overflow ) + def reconnect(self) -> None: + """Reconnect to the stream. + + This will cancel the current task and create a new one. + """ + if not self._task.done(): + _logger.warning( + "%s: reconnecting to the stream, cancelling the current task", + self._stream_name, + ) + self._task.cancel() + self._task = asyncio.create_task(self._run()) + @property def is_running(self) -> bool: """Return whether the streaming helper is running. From 2053db25e85fd5c99edbed6e88c6fe8e7096652d Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 22 May 2025 15:48:43 +0200 Subject: [PATCH 2/2] Update src/frequenz/client/base/streaming.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Mathias L. Baumann --- src/frequenz/client/base/streaming.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index af81a70..7634ca5 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -88,6 +88,10 @@ def reconnect(self) -> None: self._stream_name, ) self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass self._task = asyncio.create_task(self._run()) @property