Skip to content

Commit 10e6d51

Browse files
committed
Add new method Dispatcher.refresh() to fetch latest dispatches
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 12a3368 commit 10e6d51

File tree

3 files changed

+56
-38
lines changed

3 files changed

+56
-38
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
## New Features
1616

17-
<!-- Here goes the main new features and examples or instructions on how to use them -->
17+
* A new method `Dispatcher.refresh()` was added, for when you need to refresh the dispatcher's state.
1818

1919
## Bug Fixes
2020

src/frequenz/dispatch/_dispatcher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ async def start(self) -> None:
213213
"""Start the actor."""
214214
self._actor.start()
215215

216+
async def refresh(self) -> None:
217+
"""Re-fetch all dispatches."""
218+
await self._actor.fetch()
219+
216220
@property
217221
def client(self) -> Client:
218222
"""Return the client."""

src/frequenz/dispatch/actor.py

Lines changed: 51 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"""The logger for this module."""
2323

2424

25+
# pylint: disable=too-many-instance-attributes
2526
class DispatchingActor(Actor):
2627
"""Dispatch actor.
2728
@@ -84,12 +85,14 @@ def __init__(
8485
always at index 0.
8586
"""
8687

88+
self._currently_fetching = False
89+
8790
async def _run(self) -> None:
8891
"""Run the actor."""
8992
_logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)
9093

9194
# Initial fetch
92-
await self._fetch()
95+
await self.fetch()
9396

9497
stream = self._client.stream(microgrid_id=self._microgrid_id)
9598

@@ -151,7 +154,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
151154

152155
self._update_timer()
153156

154-
async def _fetch(self) -> None:
157+
async def fetch(self) -> None:
155158
"""Fetch all relevant dispatches using list.
156159
157160
This is used for the initial fetch and for re-fetching all dispatches
@@ -160,43 +163,54 @@ async def _fetch(self) -> None:
160163
old_dispatches = self._dispatches
161164
self._dispatches = {}
162165

163-
try:
164-
_logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
165-
async for page in self._client.list(microgrid_id=self._microgrid_id):
166-
for client_dispatch in page:
167-
dispatch = Dispatch(client_dispatch)
168-
169-
self._dispatches[dispatch.id] = Dispatch(client_dispatch)
170-
old_dispatch = old_dispatches.pop(dispatch.id, None)
171-
if not old_dispatch:
172-
_logger.info("New dispatch: %s", dispatch)
173-
await self._update_dispatch_schedule_and_notify(dispatch, None)
174-
await self._lifecycle_updates_sender.send(
175-
Created(dispatch=dispatch)
176-
)
177-
elif dispatch.update_time != old_dispatch.update_time:
178-
_logger.info("Updated dispatch: %s", dispatch)
179-
await self._update_dispatch_schedule_and_notify(
180-
dispatch, old_dispatch
181-
)
182-
await self._lifecycle_updates_sender.send(
183-
Updated(dispatch=dispatch)
184-
)
185-
186-
except grpc.aio.AioRpcError as error:
187-
_logger.error("Error fetching dispatches: %s", error)
188-
self._dispatches = old_dispatches
166+
if self._currently_fetching:
167+
_logger.debug("Already fetching dispatches, skipping")
189168
return
190169

191-
for dispatch in old_dispatches.values():
192-
_logger.info("Deleted dispatch: %s", dispatch)
193-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
194-
await self._update_dispatch_schedule_and_notify(None, dispatch)
195-
196-
# Set deleted only here as it influences the result of dispatch.running()
197-
# which is used in above in _running_state_change
198-
dispatch._set_deleted() # pylint: disable=protected-access
199-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
170+
try:
171+
self._currently_fetching = True
172+
173+
try:
174+
_logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
175+
async for page in self._client.list(microgrid_id=self._microgrid_id):
176+
for client_dispatch in page:
177+
dispatch = Dispatch(client_dispatch)
178+
179+
self._dispatches[dispatch.id] = Dispatch(client_dispatch)
180+
old_dispatch = old_dispatches.pop(dispatch.id, None)
181+
if not old_dispatch:
182+
_logger.info("New dispatch: %s", dispatch)
183+
await self._update_dispatch_schedule_and_notify(
184+
dispatch, None
185+
)
186+
await self._lifecycle_updates_sender.send(
187+
Created(dispatch=dispatch)
188+
)
189+
elif dispatch.update_time != old_dispatch.update_time:
190+
_logger.info("Updated dispatch: %s", dispatch)
191+
await self._update_dispatch_schedule_and_notify(
192+
dispatch, old_dispatch
193+
)
194+
await self._lifecycle_updates_sender.send(
195+
Updated(dispatch=dispatch)
196+
)
197+
198+
except grpc.aio.AioRpcError as error:
199+
_logger.error("Error fetching dispatches: %s", error)
200+
self._dispatches = old_dispatches
201+
return
202+
203+
for dispatch in old_dispatches.values():
204+
_logger.info("Deleted dispatch: %s", dispatch)
205+
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
206+
await self._update_dispatch_schedule_and_notify(None, dispatch)
207+
208+
# Set deleted only here as it influences the result of dispatch.running()
209+
# which is used in above in _running_state_change
210+
dispatch._set_deleted() # pylint: disable=protected-access
211+
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
212+
finally:
213+
self._currently_fetching = False
200214

201215
async def _update_dispatch_schedule_and_notify(
202216
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None

0 commit comments

Comments
 (0)