Skip to content

Commit ba62a7a

Browse files
committed
Reconnect stream if there has been no event in 5 minutes
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent ce80d90 commit ba62a7a

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
* This release now supports the sdk up to rc2000.
1414
* Less logs are now on `INFO` level, and more on `DEBUG` level, making the output less verbose.
15+
* If there has been no dispatch event for 5 minutes, the dispatch client will force-reconnect the stream
1516

1617
## Bug Fixes
1718

src/frequenz/dispatch/_bg_service.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ def __init__(
132132
self._initial_fetch_event = asyncio.Event()
133133
"""The initial fetch event."""
134134

135+
self._last_stream_event = datetime.now(timezone.utc)
136+
"""Time of the last stream event, used to detect connection issues."""
137+
135138
async def wait_for_initialization(self) -> None:
136139
"""Wait for the initial fetch to complete."""
137140
await self._initial_fetch_event.wait()
@@ -234,17 +237,29 @@ async def _run(self) -> None:
234237
await self._fetch()
235238

236239
stream = self._client.stream(microgrid_id=self._microgrid_id)
240+
max_event_inactivity = timedelta(minutes=5)
237241

238242
# Streaming updates
239243
async for selected in select(self._next_event_timer, stream):
244+
now = datetime.now(timezone.utc)
240245
if selected_from(selected, self._next_event_timer):
246+
if self._last_stream_event + max_event_inactivity < now:
247+
_logger.info("No stream events for %s minutes, restarting stream")
248+
# This restarts the stream task but keeps the channel
249+
# pylint: disable=protected-access
250+
broadcaster = self._client._get_stream(self._microgrid_id)
251+
broadcaster._task.cancel()
252+
broadcaster._task = asyncio.create_task(self._run())
253+
self._last_stream_event = now
254+
# pylint: enable=protected-access
241255
if not self._scheduled_events:
242256
continue
243257
await self._execute_scheduled_event(
244258
heappop(self._scheduled_events).dispatch
245259
)
246260
elif selected_from(selected, stream):
247261
_logger.debug("Received dispatch event: %s", selected.message)
262+
self._last_stream_event = now
248263
dispatch = Dispatch(selected.message.dispatch)
249264
match selected.message.event:
250265
case Event.CREATED:

0 commit comments

Comments
 (0)