Skip to content

Commit e73a0b1

Browse files
committed
Properly start/stop timer in DispatchScheduler
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 6e1f346 commit e73a0b1

File tree

4 files changed

+69
-53
lines changed

4 files changed

+69
-53
lines changed

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212

1313
from frequenz.channels import Broadcast, Receiver, Sender, select
1414
from frequenz.channels.timer import SkipMissedAndDrift, Timer
15-
from frequenz.client.common.microgrid.components import ComponentCategory
16-
from frequenz.client.microgrid import ComponentId
15+
from frequenz.client.common.microgrid.components import ComponentCategory, ComponentId
1716
from frequenz.sdk.actor import Actor, BackgroundService
1817

1918
from ._dispatch import Dispatch

src/frequenz/dispatch/_bg_service.py

Lines changed: 65 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import logging
1111
from abc import ABC, abstractmethod
1212
from collections.abc import Mapping
13+
from contextlib import closing
1314
from dataclasses import dataclass, field
1415
from datetime import datetime, timedelta, timezone
1516
from heapq import heappop, heappush
@@ -113,13 +114,6 @@ def __init__(
113114
)
114115

115116
self._running_state_status_tx = self._running_state_status_channel.new_sender()
116-
self._next_event_timer = Timer(
117-
timedelta(seconds=100), SkipMissedAndResync(), auto_start=False
118-
)
119-
"""The timer to schedule the next event.
120-
121-
Interval is chosen arbitrarily, as it will be reset on the first event.
122-
"""
123117

124118
self._scheduled_events: list["DispatchScheduler.QueueItem"] = []
125119
"""The scheduled events, sorted by time.
@@ -191,6 +185,10 @@ async def new_running_state_event_receiver(
191185
if not self._tasks:
192186
raise RuntimeError("Dispatch service not started")
193187

188+
(task,) = self._tasks
189+
if task.done():
190+
raise RuntimeError("Dispatch service not running")
191+
194192
# Find all matching dispatches based on the type and collect them
195193
dispatches = [
196194
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
@@ -230,44 +228,59 @@ async def _run(self) -> None:
230228
self._microgrid_id,
231229
)
232230

233-
# Initial fetch
234-
await self._fetch()
235-
236-
stream = self._client.stream(microgrid_id=self._microgrid_id)
237-
238231
# Streaming updates
239-
async for selected in select(self._next_event_timer, stream):
240-
if selected_from(selected, self._next_event_timer):
241-
if not self._scheduled_events:
242-
continue
243-
await self._execute_scheduled_event(
244-
heappop(self._scheduled_events).dispatch
245-
)
246-
elif selected_from(selected, stream):
247-
_logger.debug("Received dispatch event: %s", selected.message)
248-
dispatch = Dispatch(selected.message.dispatch)
249-
match selected.message.event:
250-
case Event.CREATED:
251-
self._dispatches[dispatch.id] = dispatch
252-
await self._update_dispatch_schedule_and_notify(dispatch, None)
253-
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
254-
case Event.UPDATED:
255-
await self._update_dispatch_schedule_and_notify(
256-
dispatch, self._dispatches[dispatch.id]
257-
)
258-
self._dispatches[dispatch.id] = dispatch
259-
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
260-
case Event.DELETED:
261-
self._dispatches.pop(dispatch.id)
262-
await self._update_dispatch_schedule_and_notify(None, dispatch)
263-
264-
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
265-
266-
async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
232+
with closing(
233+
Timer(timedelta(seconds=100), SkipMissedAndResync(), auto_start=False)
234+
) as next_event_timer:
235+
# Initial fetch
236+
await self._fetch(next_event_timer)
237+
stream = self._client.stream(microgrid_id=self._microgrid_id)
238+
239+
async for selected in select(next_event_timer, stream):
240+
if selected_from(selected, next_event_timer):
241+
if not self._scheduled_events:
242+
continue
243+
await self._execute_scheduled_event(
244+
heappop(self._scheduled_events).dispatch, next_event_timer
245+
)
246+
elif selected_from(selected, stream):
247+
_logger.debug("Received dispatch event: %s", selected.message)
248+
dispatch = Dispatch(selected.message.dispatch)
249+
match selected.message.event:
250+
case Event.CREATED:
251+
self._dispatches[dispatch.id] = dispatch
252+
await self._update_dispatch_schedule_and_notify(
253+
dispatch, None, next_event_timer
254+
)
255+
await self._lifecycle_events_tx.send(
256+
Created(dispatch=dispatch)
257+
)
258+
case Event.UPDATED:
259+
await self._update_dispatch_schedule_and_notify(
260+
dispatch,
261+
self._dispatches[dispatch.id],
262+
next_event_timer,
263+
)
264+
self._dispatches[dispatch.id] = dispatch
265+
await self._lifecycle_events_tx.send(
266+
Updated(dispatch=dispatch)
267+
)
268+
case Event.DELETED:
269+
self._dispatches.pop(dispatch.id)
270+
await self._update_dispatch_schedule_and_notify(
271+
None, dispatch, next_event_timer
272+
)
273+
274+
await self._lifecycle_events_tx.send(
275+
Deleted(dispatch=dispatch)
276+
)
277+
278+
async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None:
267279
"""Execute a scheduled event.
268280
269281
Args:
270282
dispatch: The dispatch to execute.
283+
timer: The timer to use for scheduling the next event.
271284
"""
272285
_logger.debug("Executing scheduled event: %s (%s)", dispatch, dispatch.started)
273286
await self._send_running_state_change(dispatch)
@@ -282,9 +295,9 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
282295
else:
283296
self._schedule_start(dispatch)
284297

285-
self._update_timer()
298+
self._update_timer(timer)
286299

287-
async def _fetch(self) -> None:
300+
async def _fetch(self, timer: Timer) -> None:
288301
"""Fetch all relevant dispatches using list.
289302
290303
This is used for the initial fetch and for re-fetching all dispatches
@@ -305,12 +318,14 @@ async def _fetch(self) -> None:
305318
old_dispatch = old_dispatches.pop(dispatch.id, None)
306319
if not old_dispatch:
307320
_logger.debug("New dispatch: %s", dispatch)
308-
await self._update_dispatch_schedule_and_notify(dispatch, None)
321+
await self._update_dispatch_schedule_and_notify(
322+
dispatch, None, timer
323+
)
309324
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
310325
elif dispatch.update_time != old_dispatch.update_time:
311326
_logger.debug("Updated dispatch: %s", dispatch)
312327
await self._update_dispatch_schedule_and_notify(
313-
dispatch, old_dispatch
328+
dispatch, old_dispatch, timer
314329
)
315330
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
316331

@@ -324,7 +339,7 @@ async def _fetch(self) -> None:
324339
for dispatch in old_dispatches.values():
325340
_logger.debug("Deleted dispatch: %s", dispatch)
326341
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
327-
await self._update_dispatch_schedule_and_notify(None, dispatch)
342+
await self._update_dispatch_schedule_and_notify(None, dispatch, timer)
328343

329344
# Set deleted only here as it influences the result of dispatch.started
330345
# which is used in above in _running_state_change
@@ -334,7 +349,7 @@ async def _fetch(self) -> None:
334349
self._initial_fetch_event.set()
335350

336351
async def _update_dispatch_schedule_and_notify(
337-
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None
352+
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None, timer: Timer
338353
) -> None:
339354
"""Update the schedule for a dispatch.
340355
@@ -350,6 +365,7 @@ async def _update_dispatch_schedule_and_notify(
350365
Args:
351366
dispatch: The dispatch to update the schedule for.
352367
old_dispatch: The old dispatch, if available.
368+
timer: The timer to use for scheduling the next event.
353369
"""
354370
# If dispatch is None, the dispatch was deleted
355371
# and we need to cancel any existing event for it
@@ -392,13 +408,13 @@ async def _update_dispatch_schedule_and_notify(
392408
self._schedule_start(dispatch)
393409

394410
# We modified the schedule, so we need to reset the timer
395-
self._update_timer()
411+
self._update_timer(timer)
396412

397-
def _update_timer(self) -> None:
413+
def _update_timer(self, timer: Timer) -> None:
398414
"""Update the timer to the next event."""
399415
if self._scheduled_events:
400416
due_at: datetime = self._scheduled_events[0].time
401-
self._next_event_timer.reset(interval=due_at - datetime.now(timezone.utc))
417+
timer.reset(interval=due_at - datetime.now(timezone.utc))
402418
_logger.debug("Next event scheduled at %s", self._scheduled_events[0].time)
403419

404420
def _remove_scheduled(self, dispatch: Dispatch) -> bool:

tests/test_frequenz_dispatch.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ async def test_dispatch_new_but_finished(
461461
test_env.client.set_dispatches(test_env.microgrid_id, [finished_dispatch])
462462
await test_env.service.stop()
463463
test_env.service.start()
464+
464465
test_env = replace(
465466
test_env,
466467
lifecycle_events=test_env.service.new_lifecycle_events_receiver("TEST_TYPE"),
@@ -470,8 +471,8 @@ async def test_dispatch_new_but_finished(
470471
)
471472
),
472473
)
473-
474474
fake_time.shift(timedelta(seconds=1))
475+
475476
# Process the lifecycle event caused by the old dispatch at startup
476477
await test_env.lifecycle_events.receive()
477478

tests/test_managing_actor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
import pytest
1515
import time_machine
1616
from frequenz.channels import Broadcast, Receiver, Sender
17+
from frequenz.client.common.microgrid.components import ComponentId
1718
from frequenz.client.dispatch import recurrence
1819
from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule
1920
from frequenz.client.dispatch.test.client import FakeClient
2021
from frequenz.client.dispatch.test.generator import DispatchGenerator
21-
from frequenz.client.microgrid import ComponentId
2222
from frequenz.sdk.actor import Actor
2323
from pytest import fixture
2424

0 commit comments

Comments
 (0)