Skip to content

Commit a79b3e3

Browse files
committed
Reconnect stream if there has been no event in 5 minutes
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent ce80d90 commit a79b3e3

File tree

3 files changed

+18
-1
lines changed

3 files changed

+18
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
* This release now supports the sdk up to rc2000.
1414
* Less logs are now on `INFO` level, and more on `DEBUG` level, making the output less verbose.
15+
* Introduced a timeout to the stream combined with a periodic refresh on reconnect. This allows the stream to be more resilient against network issues.
1516

1617
## Bug Fixes
1718

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ dependencies = [
4040
# plugins.mkdocstrings.handlers.python.import)
4141
"frequenz-sdk >= 1.0.0-rc1302, < 1.0.0-rc2000",
4242
"frequenz-channels >= 1.6.1, < 2.0.0",
43-
"frequenz-client-dispatch >= 0.10.1, < 0.11.0",
43+
"frequenz-client-dispatch >= 0.10.2, < 0.11.0",
4444
]
4545
dynamic = ["version"]
4646

src/frequenz/dispatch/_bg_service.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ def __init__(
132132
self._initial_fetch_event = asyncio.Event()
133133
"""The initial fetch event."""
134134

135+
self._last_refresh = datetime.now(timezone.utc)
136+
"""Time of the last stream event, used to detect connection issues."""
137+
135138
async def wait_for_initialization(self) -> None:
136139
"""Wait for the initial fetch to complete."""
137140
await self._initial_fetch_event.wait()
@@ -234,17 +237,30 @@ async def _run(self) -> None:
234237
await self._fetch()
235238

236239
stream = self._client.stream(microgrid_id=self._microgrid_id)
240+
stream_reconnect_period = self._client.stream_timeout
237241

238242
# Streaming updates
239243
async for selected in select(self._next_event_timer, stream):
244+
now = datetime.now(timezone.utc)
240245
if selected_from(selected, self._next_event_timer):
246+
if self._last_refresh + stream_reconnect_period < now:
247+
_logger.info(
248+
"Periodic refresh of dispatches for microgrid %s",
249+
self._microgrid_id,
250+
)
251+
# When _client.stream reconnects after the timeout, perform
252+
# a refresh to ensure we didn't miss any updates
253+
await self._fetch()
254+
self._last_refresh = now
255+
# pylint: enable=protected-access
241256
if not self._scheduled_events:
242257
continue
243258
await self._execute_scheduled_event(
244259
heappop(self._scheduled_events).dispatch
245260
)
246261
elif selected_from(selected, stream):
247262
_logger.debug("Received dispatch event: %s", selected.message)
263+
self._last_refresh = now
248264
dispatch = Dispatch(selected.message.dispatch)
249265
match selected.message.event:
250266
case Event.CREATED:

0 commit comments

Comments
 (0)