Skip to content

Commit e5a76d2

Browse files
committed
Context timer
1 parent 8eb46d7 commit e5a76d2

File tree

3 files changed

+47
-37
lines changed

3 files changed

+47
-37
lines changed

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212

1313
from frequenz.channels import Broadcast, Receiver, Sender, select
1414
from frequenz.channels.timer import SkipMissedAndDrift, Timer
15-
from frequenz.client.common.microgrid.components import ComponentCategory
16-
from frequenz.client.microgrid import ComponentId
15+
from frequenz.client.common.microgrid.components import ComponentCategory, ComponentId
1716
from frequenz.sdk.actor import Actor, BackgroundService
1817

1918
from ._dispatch import Dispatch

src/frequenz/dispatch/_bg_service.py

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import logging
1111
from abc import ABC, abstractmethod
1212
from collections.abc import Mapping
13+
from contextlib import closing
1314
from dataclasses import dataclass, field
1415
from datetime import datetime, timedelta, timezone
1516
from heapq import heappop, heappush
@@ -20,7 +21,6 @@
2021
from frequenz.client.dispatch import DispatchApiClient
2122
from frequenz.client.dispatch.types import Event
2223
from frequenz.sdk.actor import BackgroundService
23-
from typing_extensions import override
2424

2525
from ._dispatch import Dispatch
2626
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -192,6 +192,10 @@ async def new_running_state_event_receiver(
192192
if not self._tasks:
193193
raise RuntimeError("Dispatch service not started")
194194

195+
(task,) = self._tasks
196+
if task.done():
197+
raise RuntimeError("Dispatch service not running")
198+
195199
# Find all matching dispatches based on the type and collect them
196200
dispatches = [
197201
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
@@ -220,15 +224,8 @@ async def new_running_state_event_receiver(
220224

221225
# pylint: enable=redefined-builtin
222226

223-
@override
224-
async def stop(self, msg: str | None = None) -> None:
225-
"""Stop the background service."""
226-
self._next_event_timer.close()
227-
await super().stop(msg)
228-
229227
def start(self) -> None:
230228
"""Start the background service."""
231-
self._next_event_timer.reset(interval=timedelta(seconds=1))
232229
self._tasks.add(asyncio.create_task(self._run()))
233230

234231
async def _run(self) -> None:
@@ -244,32 +241,43 @@ async def _run(self) -> None:
244241
stream = self._client.stream(microgrid_id=self._microgrid_id)
245242

246243
# Streaming updates
247-
async for selected in select(self._next_event_timer, stream):
248-
if selected_from(selected, self._next_event_timer):
249-
if not self._scheduled_events:
250-
continue
251-
await self._execute_scheduled_event(
252-
heappop(self._scheduled_events).dispatch
253-
)
254-
elif selected_from(selected, stream):
255-
_logger.debug("Received dispatch event: %s", selected.message)
256-
dispatch = Dispatch(selected.message.dispatch)
257-
match selected.message.event:
258-
case Event.CREATED:
259-
self._dispatches[dispatch.id] = dispatch
260-
await self._update_dispatch_schedule_and_notify(dispatch, None)
261-
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
262-
case Event.UPDATED:
263-
await self._update_dispatch_schedule_and_notify(
264-
dispatch, self._dispatches[dispatch.id]
265-
)
266-
self._dispatches[dispatch.id] = dispatch
267-
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
268-
case Event.DELETED:
269-
self._dispatches.pop(dispatch.id)
270-
await self._update_dispatch_schedule_and_notify(None, dispatch)
271-
272-
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
244+
with closing(self._next_event_timer) as next_event_timer:
245+
async for selected in select(next_event_timer, stream):
246+
if selected_from(selected, next_event_timer):
247+
if not self._scheduled_events:
248+
continue
249+
await self._execute_scheduled_event(
250+
heappop(self._scheduled_events).dispatch
251+
)
252+
elif selected_from(selected, stream):
253+
_logger.debug("Received dispatch event: %s", selected.message)
254+
dispatch = Dispatch(selected.message.dispatch)
255+
match selected.message.event:
256+
case Event.CREATED:
257+
self._dispatches[dispatch.id] = dispatch
258+
await self._update_dispatch_schedule_and_notify(
259+
dispatch, None
260+
)
261+
await self._lifecycle_events_tx.send(
262+
Created(dispatch=dispatch)
263+
)
264+
case Event.UPDATED:
265+
await self._update_dispatch_schedule_and_notify(
266+
dispatch, self._dispatches[dispatch.id]
267+
)
268+
self._dispatches[dispatch.id] = dispatch
269+
await self._lifecycle_events_tx.send(
270+
Updated(dispatch=dispatch)
271+
)
272+
case Event.DELETED:
273+
self._dispatches.pop(dispatch.id)
274+
await self._update_dispatch_schedule_and_notify(
275+
None, dispatch
276+
)
277+
278+
await self._lifecycle_events_tx.send(
279+
Deleted(dispatch=dispatch)
280+
)
273281

274282
async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
275283
"""Execute a scheduled event.

tests/test_frequenz_dispatch.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,9 @@ async def test_dispatch_new_but_finished(
461461
test_env.client.set_dispatches(test_env.microgrid_id, [finished_dispatch])
462462
await test_env.service.stop()
463463
test_env.service.start()
464+
fake_time.shift(timedelta(seconds=1))
465+
await asyncio.sleep(1)
466+
464467
test_env = replace(
465468
test_env,
466469
lifecycle_events=test_env.service.new_lifecycle_events_receiver("TEST_TYPE"),
@@ -470,8 +473,8 @@ async def test_dispatch_new_but_finished(
470473
)
471474
),
472475
)
476+
await asyncio.sleep(1)
473477

474-
fake_time.shift(timedelta(seconds=1))
475478
# Process the lifecycle event caused by the old dispatch at startup
476479
await test_env.lifecycle_events.receive()
477480

0 commit comments

Comments
 (0)