-
Couldn't load subscription status.
- Fork 6
Disconnect dispatcher at cleanup #146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a safer/more complete/future-proof approach. Didn't think too much about the order, but I think it should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @ela-kotulska-frequenz pointed out this also misses disconnecting at stop. If we implement it there, it should come for free to __aexit__() as the default implementation of __aexit__() for BackgroundService is to call stop() (stop() calls self.cancel() + await self.wait()).
For now a "simple" solution when we need to do something async in cancel() (like the call to self._client.disconnect() would be to store the future, like:
def cancel():
self._disconnecting_future = self._client.disconnect()
def wait():
if self._disconnecting_future is not None:
await self._disconnecting_future
await super(self).wait()Not super nice, as the disconnection won't really happen until the await, I think, but it should work for most cases. To do it right we would need to spawn a new task just to do the disconnection :-/
070b1e1 to
12b5e43
Compare
|
Still missing the stop() part |
src/frequenz/dispatch/_dispatcher.py
Outdated
| async def stop(self, msg: str | None = None) -> None: | ||
| """Stop the local dispatch service and initiate client disconnection. | ||
| This method is called when the dispatcher is stopped. | ||
| Args: | ||
| msg: The message to log. | ||
| """ | ||
| _logger.debug("Stopping dispatcher") | ||
| await self._bg_service.stop(msg) | ||
|
|
||
| for type in self._actor_dispatchers.keys(): | ||
| await self.stop_managing(type) | ||
|
|
||
| await self._client.disconnect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you "fixed" cancel() and wait() there should be no need to override stop() too. In this case I guess you should move the actor stopping to cancel() too. And once you do that, maybe it makes sense to actually spawn a new task to do the cleanup. not sure, I guess you can also have a list of futures to wait for in the wait(), but you will not actually initiate the cleanup upon calling cancel(), only when calling wait().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm so basically just remove stop() now? The actor stopping is already in cancel (using .cancel)..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I guess so, I hope it works. Stopping/cleanup is pretty difficult/complicated at the moment, I hope we can improve things soon.
Signed-off-by: Mathias L. Baumann <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I praise the lord that this works 🙏 😆
fixes #145