diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 0174777..246ae46 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -14,7 +14,7 @@ ## New Features - +* A new method `Dispatcher.refresh()` was added, for when you need to refresh the dispatcher's state. ## Bug Fixes diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 01b327d..241f7ba 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -213,6 +213,10 @@ async def start(self) -> None: """Start the actor.""" self._actor.start() + async def refresh(self) -> None: + """Re-fetch all dispatches.""" + await self._actor.fetch() + @property def client(self) -> Client: """Return the client.""" diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 2f58845..b435203 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -22,6 +22,7 @@ """The logger for this module.""" +# pylint: disable=too-many-instance-attributes class DispatchingActor(Actor): """Dispatch actor. @@ -84,12 +85,14 @@ def __init__( always at index 0. """ + self._currently_fetching = False + async def _run(self) -> None: """Run the actor.""" _logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id) # Initial fetch - await self._fetch() + await self.fetch() stream = self._client.stream(microgrid_id=self._microgrid_id) @@ -151,7 +154,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None: self._update_timer() - async def _fetch(self) -> None: + async def fetch(self) -> None: """Fetch all relevant dispatches using list. This is used for the initial fetch and for re-fetching all dispatches @@ -160,43 +163,54 @@ async def _fetch(self) -> None: old_dispatches = self._dispatches self._dispatches = {} - try: - _logger.info("Fetching dispatches for microgrid %s", self._microgrid_id) - async for page in self._client.list(microgrid_id=self._microgrid_id): - for client_dispatch in page: - dispatch = Dispatch(client_dispatch) - - self._dispatches[dispatch.id] = Dispatch(client_dispatch) - old_dispatch = old_dispatches.pop(dispatch.id, None) - if not old_dispatch: - _logger.info("New dispatch: %s", dispatch) - await self._update_dispatch_schedule_and_notify(dispatch, None) - await self._lifecycle_updates_sender.send( - Created(dispatch=dispatch) - ) - elif dispatch.update_time != old_dispatch.update_time: - _logger.info("Updated dispatch: %s", dispatch) - await self._update_dispatch_schedule_and_notify( - dispatch, old_dispatch - ) - await self._lifecycle_updates_sender.send( - Updated(dispatch=dispatch) - ) - - except grpc.aio.AioRpcError as error: - _logger.error("Error fetching dispatches: %s", error) - self._dispatches = old_dispatches + if self._currently_fetching: + _logger.debug("Already fetching dispatches, skipping") return - for dispatch in old_dispatches.values(): - _logger.info("Deleted dispatch: %s", dispatch) - await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) - await self._update_dispatch_schedule_and_notify(None, dispatch) - - # Set deleted only here as it influences the result of dispatch.running() - # which is used in above in _running_state_change - dispatch._set_deleted() # pylint: disable=protected-access - await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) + try: + self._currently_fetching = True + + try: + _logger.info("Fetching dispatches for microgrid %s", self._microgrid_id) + async for page in self._client.list(microgrid_id=self._microgrid_id): + for client_dispatch in page: + dispatch = Dispatch(client_dispatch) + + self._dispatches[dispatch.id] = Dispatch(client_dispatch) + old_dispatch = old_dispatches.pop(dispatch.id, None) + if not old_dispatch: + _logger.info("New dispatch: %s", dispatch) + await self._update_dispatch_schedule_and_notify( + dispatch, None + ) + await self._lifecycle_updates_sender.send( + Created(dispatch=dispatch) + ) + elif dispatch.update_time != old_dispatch.update_time: + _logger.info("Updated dispatch: %s", dispatch) + await self._update_dispatch_schedule_and_notify( + dispatch, old_dispatch + ) + await self._lifecycle_updates_sender.send( + Updated(dispatch=dispatch) + ) + + except grpc.aio.AioRpcError as error: + _logger.error("Error fetching dispatches: %s", error) + self._dispatches = old_dispatches + return + + for dispatch in old_dispatches.values(): + _logger.info("Deleted dispatch: %s", dispatch) + await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) + await self._update_dispatch_schedule_and_notify(None, dispatch) + + # Set deleted only here as it influences the result of dispatch.running() + # which is used in above in _running_state_change + dispatch._set_deleted() # pylint: disable=protected-access + await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) + finally: + self._currently_fetching = False async def _update_dispatch_schedule_and_notify( self, dispatch: Dispatch | None, old_dispatch: Dispatch | None