Skip to content

Commit 4fbe8a4

Browse files
authored
Disconnect dispatcher at cleanup
1 parent 30e84b8 commit 4fbe8a4

File tree

2 files changed

+31
-5
lines changed

2 files changed

+31
-5
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414

1515
## Bug Fixes
1616

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
17+
- Fixed an issue in the `Dispatcher` class where the client connection was not properly disconnected during cleanup, potentially causing unclosed socket errors.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import logging
1010
from asyncio import Event
1111
from datetime import timedelta
12+
from types import TracebackType
1213
from typing import Awaitable, Callable, Self
1314

1415
from frequenz.channels import Receiver
@@ -221,6 +222,7 @@ def __init__(
221222
self._actor_dispatchers: dict[str, ActorDispatcher] = {}
222223
self._empty_event = Event()
223224
self._empty_event.set()
225+
self._disconnecting_futurgite: asyncio.Future[None] | None = None
224226

225227
@override
226228
def start(self) -> None:
@@ -235,19 +237,23 @@ def is_running(self) -> bool:
235237

236238
@override
237239
async def wait(self) -> None:
238-
"""Wait until all actor dispatches are stopped."""
239-
await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
240+
"""Wait until all actor dispatches are stopped and client is disconnected."""
241+
if self._disconnecting_future is not None:
242+
await self._disconnecting_future
240243

244+
await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
241245
self._actor_dispatchers.clear()
242246

243-
@override
244247
def cancel(self, msg: str | None = None) -> None:
245-
"""Stop the local dispatch service."""
248+
"""Stop the local dispatch service and initiate client disconnection."""
246249
self._bg_service.cancel(msg)
247250

248251
for instance in self._actor_dispatchers.values():
249252
instance.cancel()
250253

254+
# Initiate client disconnection asynchronously
255+
self._disconnecting_future = asyncio.ensure_future(self._client.disconnect())
256+
251257
async def wait_for_initialization(self) -> None:
252258
"""Wait until the background service is initialized."""
253259
await self._bg_service.wait_for_initialization()
@@ -358,9 +364,29 @@ async def __aenter__(self) -> Self:
358364
This background service.
359365
"""
360366
await super().__aenter__()
367+
await self._client.__aenter__()
361368
await self.wait_for_initialization()
362369
return self
363370

371+
@override
372+
async def __aexit__(
373+
self,
374+
exc_type: type[BaseException] | None,
375+
exc_value: BaseException | None,
376+
traceback: TracebackType | None,
377+
) -> None:
378+
"""Exit an async context.
379+
380+
Stop this background service and disconnect the client.
381+
382+
Args:
383+
exc_type: The exception type.
384+
exc_value: The exception value.
385+
traceback: The traceback object.
386+
"""
387+
await self._client.__aexit__(exc_type, exc_value, traceback)
388+
await self._client.disconnect()
389+
364390
def new_lifecycle_events_receiver(
365391
self, dispatch_type: str
366392
) -> Receiver[DispatchEvent]:

0 commit comments

Comments
 (0)