Skip to content

Commit e7514f4

Browse files
committed
Turn dispatch actor into background service
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 2b6e511 commit e7514f4

File tree

4 files changed

+32
-24
lines changed

4 files changed

+32
-24
lines changed
Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
"""The dispatch actor."""
55

6+
import asyncio
67
import logging
78
from dataclasses import dataclass, field
89
from datetime import datetime, timedelta, timezone
@@ -13,7 +14,7 @@
1314
from frequenz.channels.timer import SkipMissedAndResync, Timer
1415
from frequenz.client.dispatch import Client
1516
from frequenz.client.dispatch.types import Event
16-
from frequenz.sdk.actor import Actor
17+
from frequenz.sdk.actor import BackgroundService
1718

1819
from ._dispatch import Dispatch
1920
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -23,13 +24,11 @@
2324

2425

2526
# pylint: disable=too-many-instance-attributes
26-
class DispatchingActor(Actor):
27-
"""Dispatch actor.
27+
class DispatchBackgroundService(BackgroundService):
28+
"""Dispatch background service.
2829
29-
This actor is responsible for handling dispatches for a microgrid.
30-
31-
This means staying in sync with the API and scheduling
32-
dispatches as necessary.
30+
This service is responsible for managing dispatches and scheduling them
31+
based on their start and stop times.
3332
"""
3433

3534
@dataclass(order=True)
@@ -52,7 +51,7 @@ def __init__(
5251
microgrid_id: int,
5352
client: Client,
5453
) -> None:
55-
"""Initialize the actor.
54+
"""Initialize the background service.
5655
5756
Args:
5857
microgrid_id: The microgrid ID to handle dispatches for.
@@ -81,7 +80,7 @@ def __init__(
8180
Interval is chosen arbitrarily, as it will be reset on the first event.
8281
"""
8382

84-
self._scheduled_events: list["DispatchingActor.QueueItem"] = []
83+
self._scheduled_events: list["DispatchBackgroundService.QueueItem"] = []
8584
"""The scheduled events, sorted by time.
8685
8786
Each event is a tuple of the scheduled time and the dispatch.
@@ -130,9 +129,16 @@ async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch
130129

131130
# pylint: enable=redefined-builtin
132131

132+
def start(self) -> None:
133+
"""Start the background service."""
134+
self._tasks.add(asyncio.create_task(self._run()))
135+
133136
async def _run(self) -> None:
134-
"""Run the actor."""
135-
_logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)
137+
"""Run the background service."""
138+
_logger.info(
139+
"Starting dispatching background service for microgrid %s",
140+
self._microgrid_id,
141+
)
136142

137143
# Initial fetch
138144
await self._fetch()

src/frequenz/dispatch/_dispatcher.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
from frequenz.channels import Receiver
88
from frequenz.client.dispatch import Client
99

10+
from ._bg_service import DispatchBackgroundService
1011
from ._dispatch import Dispatch
1112
from ._event import DispatchEvent
12-
from .actor import DispatchingActor
1313

1414

1515
class Dispatcher:
@@ -172,14 +172,14 @@ def __init__(
172172
key: The key to access the service.
173173
"""
174174
self._client = Client(server_url=server_url, key=key)
175-
self._actor = DispatchingActor(
175+
self._bg_service = DispatchBackgroundService(
176176
microgrid_id,
177177
self._client,
178178
)
179179

180180
async def start(self) -> None:
181-
"""Start the actor."""
182-
self._actor.start()
181+
"""Start the local dispatch service."""
182+
self._bg_service.start()
183183

184184
@property
185185
def client(self) -> Client:
@@ -197,7 +197,7 @@ def new_lifecycle_events_receiver(
197197
Returns:
198198
A new receiver for new dispatches.
199199
"""
200-
return self._actor.new_lifecycle_events_receiver(type)
200+
return self._bg_service.new_lifecycle_events_receiver(dispatch_type)
201201

202202
async def new_running_state_event_receiver(
203203
self, dispatch_type: str
@@ -234,4 +234,4 @@ async def new_running_state_event_receiver(
234234
Returns:
235235
A new receiver for dispatches whose running status changed.
236236
"""
237-
return await self._actor.new_running_state_event_receiver(type)
237+
return await self._bg_service.new_running_state_event_receiver(dispatch_type)

tests/test_frequenz_dispatch.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from pytest import fixture
2020

2121
from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated
22-
from frequenz.dispatch.actor import DispatchingActor
22+
from frequenz.dispatch._bg_service import DispatchBackgroundService
2323

2424

2525
@fixture
@@ -48,7 +48,7 @@ def _now() -> datetime:
4848
class ActorTestEnv:
4949
"""Test environment for the actor."""
5050

51-
actor: DispatchingActor
51+
actor: DispatchBackgroundService
5252
"""The actor under test."""
5353
lifecycle_events: Receiver[DispatchEvent]
5454
"""The receiver for updated dispatches."""
@@ -66,7 +66,7 @@ async def actor_env() -> AsyncIterator[ActorTestEnv]:
6666
microgrid_id = randint(1, 100)
6767
client = FakeClient()
6868

69-
actor = DispatchingActor(
69+
actor = DispatchBackgroundService(
7070
microgrid_id=microgrid_id,
7171
client=client,
7272
)

tests/test_mananging_actor.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from pytest import fixture
1919

2020
from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate
21-
from frequenz.dispatch.actor import DispatchingActor
21+
from frequenz.dispatch._bg_service import DispatchBackgroundService
2222

2323

2424
@fixture
@@ -142,14 +142,16 @@ def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
142142
until_time = now + timedelta(minutes=5)
143143

144144
# Create the heap
145-
scheduled_events: list[DispatchingActor.QueueItem] = []
145+
scheduled_events: list[DispatchBackgroundService.QueueItem] = []
146146

147147
# Push two events with the same 'until' time onto the heap
148148
heapq.heappush(
149-
scheduled_events, DispatchingActor.QueueItem(until_time, Dispatch(dispatch1))
149+
scheduled_events,
150+
DispatchBackgroundService.QueueItem(until_time, Dispatch(dispatch1)),
150151
)
151152
heapq.heappush(
152-
scheduled_events, DispatchingActor.QueueItem(until_time, Dispatch(dispatch2))
153+
scheduled_events,
154+
DispatchBackgroundService.QueueItem(until_time, Dispatch(dispatch2)),
153155
)
154156

155157

0 commit comments

Comments
 (0)