Skip to content

Commit 0e58ba8

Browse files
committed
Add new method Dispatcher.refresh() to fetch latest dispatches
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 66a6dad commit 0e58ba8

File tree

4 files changed

+55
-51
lines changed

4 files changed

+55
-51
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
## New Features
1616

17-
<!-- Here goes the main new features and examples or instructions on how to use them -->
17+
* A new method `Dispatcher.refresh()` was added, for when you need to refresh the dispatcher's state.
1818

1919
## Bug Fixes
2020

src/frequenz/dispatch/_dispatcher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ async def start(self) -> None:
210210
"""Start the actor."""
211211
self._actor.start()
212212

213+
async def refresh(self) -> None:
214+
"""Re-fetch all dispatches."""
215+
await self._actor.fetch()
216+
213217
@property
214218
def client(self) -> Client:
215219
"""Return the client."""

src/frequenz/dispatch/actor.py

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"""The logger for this module."""
2323

2424

25+
# pylint: disable=too-many-instance-attributes
2526
class DispatchingActor(Actor):
2627
"""Dispatch actor.
2728
@@ -84,12 +85,14 @@ def __init__(
8485
always at index 0.
8586
"""
8687

88+
self._currently_fetching = False
89+
8790
async def _run(self) -> None:
8891
"""Run the actor."""
8992
_logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)
9093

9194
# Initial fetch
92-
await self._fetch()
95+
await self.fetch()
9396

9497
stream = self._client.stream(microgrid_id=self._microgrid_id)
9598

@@ -151,52 +154,58 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
151154

152155
self._update_timer()
153156

154-
async def _fetch(self) -> None:
157+
async def fetch(self) -> None:
155158
"""Fetch all relevant dispatches using list.
156159
157160
This is used for the initial fetch and for re-fetching all dispatches
158161
if the connection was lost.
159162
"""
160-
old_dispatches = self._dispatches
161-
self._dispatches = {}
162-
163-
try:
164-
_logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
165-
async for page in self._client.list(microgrid_id=self._microgrid_id):
166-
for client_dispatch in page:
167-
dispatch = Dispatch(client_dispatch)
168-
169-
self._dispatches[dispatch.id] = Dispatch(client_dispatch)
170-
old_dispatch = old_dispatches.pop(dispatch.id, None)
171-
if not old_dispatch:
172-
_logger.info("New dispatch: %s", dispatch)
173-
await self._update_dispatch_schedule_and_notify(dispatch, None)
174-
await self._lifecycle_updates_sender.send(
175-
Created(dispatch=dispatch)
176-
)
177-
elif dispatch.update_time != old_dispatch.update_time:
178-
_logger.info("Updated dispatch: %s", dispatch)
179-
await self._update_dispatch_schedule_and_notify(
180-
dispatch, old_dispatch
181-
)
182-
await self._lifecycle_updates_sender.send(
183-
Updated(dispatch=dispatch)
184-
)
185-
186-
except grpc.aio.AioRpcError as error:
187-
_logger.error("Error fetching dispatches: %s", error)
188-
self._dispatches = old_dispatches
163+
if self._currently_fetching:
164+
_logger.debug("Already fetching dispatches, skipping")
189165
return
190166

191-
for dispatch in old_dispatches.values():
192-
_logger.info("Deleted dispatch: %s", dispatch)
193-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
194-
await self._update_dispatch_schedule_and_notify(None, dispatch)
195-
196-
# Set deleted only here as it influences the result of dispatch.started
197-
# which is used in above in _running_state_change
198-
dispatch._set_deleted() # pylint: disable=protected-access
199-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
167+
try:
168+
self._currently_fetching = True
169+
170+
old_dispatches = self._dispatches
171+
self._dispatches = {}
172+
173+
try:
174+
_logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
175+
async for page in self._client.list(microgrid_id=self._microgrid_id):
176+
for client_dispatch in page:
177+
dispatch = Dispatch(client_dispatch)
178+
179+
self._dispatches[dispatch.id] = Dispatch(client_dispatch)
180+
old_dispatch = old_dispatches.pop(dispatch.id, None)
181+
if not old_dispatch:
182+
_logger.info("New dispatch: %s", dispatch)
183+
await self._update_dispatch_schedule_and_notify(
184+
dispatch, None
185+
)
186+
await self._lifecycle_updates_sender.send(
187+
Created(dispatch=dispatch)
188+
)
189+
elif dispatch.update_time != old_dispatch.update_time:
190+
_logger.info("Updated dispatch: %s", dispatch)
191+
await self._update_dispatch_schedule_and_notify(
192+
dispatch, old_dispatch
193+
)
194+
await self._lifecycle_updates_sender.send(
195+
Updated(dispatch=dispatch)
196+
)
197+
198+
except grpc.aio.AioRpcError as error:
199+
_logger.error("Error fetching dispatches: %s", error)
200+
self._dispatches = old_dispatches
201+
return
202+
203+
for dispatch in old_dispatches.values():
204+
_logger.info("Deleted dispatch: %s", dispatch)
205+
await self._update_dispatch_schedule_and_notify(None, dispatch)
206+
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
207+
finally:
208+
self._currently_fetching = False
200209

201210
async def _update_dispatch_schedule_and_notify(
202211
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None

tests/test_frequenz_dispatch.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,6 @@ async def _test_new_dispatch_created(
144144
assert False, "Expected a created event"
145145
case Created(dispatch):
146146
received = Dispatch(update_dispatch(sample, dispatch))
147-
received._set_running_status_notified() # pylint: disable=protected-access
148-
dispatch._set_running_status_notified() # pylint: disable=protected-access
149147
assert dispatch == received
150148

151149
return dispatch
@@ -184,10 +182,7 @@ async def test_existing_dispatch_updated(
184182
case Created(dispatch) | Deleted(dispatch):
185183
assert False, f"Expected an updated event, got {dispatch_event}"
186184
case Updated(dispatch):
187-
assert dispatch == Dispatch(
188-
updated,
189-
running_state_change_synced=dispatch.running_state_change_synced,
190-
)
185+
assert dispatch == Dispatch(updated)
191186

192187
await asyncio.sleep(1)
193188

@@ -212,7 +207,6 @@ async def test_existing_dispatch_deleted(
212207
assert False, "Expected a deleted event"
213208
case Deleted(dispatch):
214209
sample._set_deleted() # pylint: disable=protected-access
215-
dispatch._set_running_status_notified() # pylint: disable=protected-access
216210
assert dispatch == sample
217211

218212

@@ -352,9 +346,6 @@ async def test_dispatch_schedule(
352346
# Expect notification of the dispatch being ready to run
353347
ready_dispatch = await actor_env.running_state_change.receive()
354348

355-
# Set flag we expect to be different to compare the dispatch with the one received
356-
dispatch._set_running_status_notified() # pylint: disable=protected-access
357-
358349
assert ready_dispatch == dispatch
359350

360351
assert dispatch.duration is not None

0 commit comments

Comments
 (0)