Skip to content

Commit 16fa2a0

Browse files
committed
Disconnect dispatcher at cleanup
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 75b9793 commit 16fa2a0

File tree

2 files changed

+29
-5
lines changed

2 files changed

+29
-5
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515
## Bug Fixes
1616

1717
* Fixes reconnecting after connection loss for streams
18+
* Fixed an issue in the `Dispatcher` class where the client connection was not properly disconnected during cleanup, potentially causing unclosed socket errors.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,13 @@ def __init__(
220220
)
221221
self._actor_dispatchers: dict[str, ActorDispatcher] = {}
222222
self._empty_event = Event()
223-
self._empty_event.set()
223+
self._disconnecting_future: asyncio.Future[None] | None = None
224224

225225
@override
226226
def start(self) -> None:
227227
"""Start the local dispatch service."""
228228
self._bg_service.start()
229+
self._empty_event.set()
229230

230231
@property
231232
@override
@@ -235,19 +236,23 @@ def is_running(self) -> bool:
235236

236237
@override
237238
async def wait(self) -> None:
238-
"""Wait until all actor dispatches are stopped."""
239-
await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
239+
"""Wait until all actor dispatches are stopped and client is disconnected."""
240+
if self._disconnecting_future is not None:
241+
await self._disconnecting_future
240242

243+
await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
241244
self._actor_dispatchers.clear()
242245

243-
@override
244246
def cancel(self, msg: str | None = None) -> None:
245-
"""Stop the local dispatch service."""
247+
"""Stop the local dispatch service and initiate client disconnection."""
246248
self._bg_service.cancel(msg)
247249

248250
for instance in self._actor_dispatchers.values():
249251
instance.cancel()
250252

253+
# Initiate client disconnection asynchronously
254+
self._disconnecting_future = asyncio.ensure_future(self._client.disconnect())
255+
251256
async def wait_for_initialization(self) -> None:
252257
"""Wait until the background service is initialized."""
253258
await self._bg_service.wait_for_initialization()
@@ -358,9 +363,27 @@ async def __aenter__(self) -> Self:
358363
This background service.
359364
"""
360365
await super().__aenter__()
366+
await self._client.__aenter__()
361367
await self.wait_for_initialization()
362368
return self
363369

370+
async def stop(self, msg: str | None = None) -> None:
371+
"""Stop the local dispatch service and initiate client disconnection.
372+
373+
This method is called when the dispatcher is stopped.
374+
375+
Args:
376+
msg: The message to log.
377+
"""
378+
_logger.debug("Stopping dispatcher")
379+
await self._bg_service.stop(msg)
380+
381+
for type in self._actor_dispatchers.keys():
382+
await self.stop_managing(type)
383+
384+
await self._client.disconnect()
385+
await super().stop(msg)
386+
364387
def new_lifecycle_events_receiver(
365388
self, dispatch_type: str
366389
) -> Receiver[DispatchEvent]:

0 commit comments

Comments
 (0)