Skip to content

Commit 1c88c58

Browse files
authored
Interface enhancements (frequenz-floss#89)
- **Replace channel properties with receiver functions** - **Tests: Prevent inf actor restarts on exceptions** This also fixes frequenz-floss#87
2 parents e4334c8 + c1606ae commit 1c88c58

File tree

7 files changed

+223
-186
lines changed

7 files changed

+223
-186
lines changed

README.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,9 @@ async def run():
3939

4040
actor = MagicMock() # replace with your actor
4141

42-
changed_running_status_rx = dispatcher.running_status_change.new_receiver()
42+
changed_running_status_rx = dispatcher.new_running_state_event_receiver("MY_TYPE")
4343

4444
async for dispatch in changed_running_status_rx:
45-
if dispatch.type != "MY_TYPE":
46-
continue
47-
4845
if dispatch.started:
4946
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
5047
if actor.is_running:

RELEASE_NOTES.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66

77
## Upgrading
88

9-
* The method `Dispatch.running(type: str)` was replaced with the property `Dispatch.started: bool`.
10-
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1500
9+
* Two properties have been replaced by methods that require a type as parameter.
10+
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, type: str)`.
11+
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, type: str)`.
1112

1213
## New Features
1314

1415
<!-- Here goes the main new features and examples or instructions on how to use them -->
1516

1617
## Bug Fixes
1718

18-
* Fixed a crash when reading a Dispatch with frequency YEARLY.
19+
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

src/frequenz/dispatch/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"""
1717

1818
from ._dispatch import Dispatch
19-
from ._dispatcher import Dispatcher, ReceiverFetcher
19+
from ._dispatcher import Dispatcher
2020
from ._event import Created, Deleted, DispatchEvent, Updated
2121
from ._managing_actor import DispatchManagingActor, DispatchUpdate
2222

@@ -25,7 +25,6 @@
2525
"Deleted",
2626
"DispatchEvent",
2727
"Dispatcher",
28-
"ReceiverFetcher",
2928
"Updated",
3029
"Dispatch",
3130
"DispatchManagingActor",

src/frequenz/dispatch/actor.py renamed to src/frequenz/dispatch/_bg_service.py

Lines changed: 79 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
# License: MIT
22
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
33

4-
"""The dispatch actor."""
4+
"""The dispatch background service."""
55

6+
import asyncio
67
import logging
78
from dataclasses import dataclass, field
89
from datetime import datetime, timedelta, timezone
910
from heapq import heappop, heappush
1011

1112
import grpc.aio
12-
from frequenz.channels import Sender, select, selected_from
13+
from frequenz.channels import Broadcast, Receiver, select, selected_from
1314
from frequenz.channels.timer import SkipMissedAndResync, Timer
1415
from frequenz.client.dispatch import Client
1516
from frequenz.client.dispatch.types import Event
16-
from frequenz.sdk.actor import Actor
17+
from frequenz.sdk.actor import BackgroundService
1718

1819
from ._dispatch import Dispatch
1920
from ._event import Created, Deleted, DispatchEvent, Updated
@@ -22,13 +23,12 @@
2223
"""The logger for this module."""
2324

2425

25-
class DispatchingActor(Actor):
26-
"""Dispatch actor.
26+
# pylint: disable=too-many-instance-attributes
27+
class DispatchScheduler(BackgroundService):
28+
"""Dispatch background service.
2729
28-
This actor is responsible for handling dispatches for a microgrid.
29-
30-
This means staying in sync with the API and scheduling
31-
dispatches as necessary.
30+
This service is responsible for managing dispatches and scheduling them
31+
based on their start and stop times.
3232
"""
3333

3434
@dataclass(order=True)
@@ -50,24 +50,28 @@ def __init__(
5050
self,
5151
microgrid_id: int,
5252
client: Client,
53-
lifecycle_updates_sender: Sender[DispatchEvent],
54-
running_state_change_sender: Sender[Dispatch],
5553
) -> None:
56-
"""Initialize the actor.
54+
"""Initialize the background service.
5755
5856
Args:
5957
microgrid_id: The microgrid ID to handle dispatches for.
6058
client: The client to use for fetching dispatches.
61-
lifecycle_updates_sender: A sender for dispatch lifecycle events.
62-
running_state_change_sender: A sender for dispatch running state changes.
6359
"""
6460
super().__init__(name="dispatch")
6561

6662
self._client = client
6763
self._dispatches: dict[int, Dispatch] = {}
6864
self._microgrid_id = microgrid_id
69-
self._lifecycle_updates_sender = lifecycle_updates_sender
70-
self._running_state_change_sender = running_state_change_sender
65+
66+
self._lifecycle_events_channel = Broadcast[DispatchEvent](
67+
name="lifecycle_events"
68+
)
69+
self._lifecycle_events_tx = self._lifecycle_events_channel.new_sender()
70+
self._running_state_status_channel = Broadcast[Dispatch](
71+
name="running_state_status"
72+
)
73+
74+
self._running_state_status_tx = self._running_state_status_channel.new_sender()
7175
self._next_event_timer = Timer(
7276
timedelta(seconds=100), SkipMissedAndResync(), auto_start=False
7377
)
@@ -76,17 +80,65 @@ def __init__(
7680
Interval is chosen arbitrarily, as it will be reset on the first event.
7781
"""
7882

79-
self._scheduled_events: list["DispatchingActor.QueueItem"] = []
83+
self._scheduled_events: list["DispatchScheduler.QueueItem"] = []
8084
"""The scheduled events, sorted by time.
8185
8286
Each event is a tuple of the scheduled time and the dispatch.
8387
heapq is used to keep the list sorted by time, so the next event is
8488
always at index 0.
8589
"""
8690

91+
# pylint: disable=redefined-builtin
92+
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
93+
"""Create a new receiver for lifecycle events.
94+
95+
Args:
96+
type: The type of events to receive.
97+
98+
Returns:
99+
A new receiver for lifecycle events.
100+
"""
101+
return self._lifecycle_events_channel.new_receiver().filter(
102+
lambda event: event.dispatch.type == type
103+
)
104+
105+
async def new_running_state_event_receiver(self, type: str) -> Receiver[Dispatch]:
106+
"""Create a new receiver for running state events.
107+
108+
Args:
109+
type: The type of events to receive.
110+
111+
Returns:
112+
A new receiver for running state status.
113+
"""
114+
# Find all matching dispatches based on the type and collect them
115+
dispatches = [
116+
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
117+
]
118+
119+
# Create receiver with enough capacity to hold all matching dispatches
120+
receiver = self._running_state_status_channel.new_receiver(
121+
limit=max(1, len(dispatches))
122+
).filter(lambda dispatch: dispatch.type == type)
123+
124+
# Send all matching dispatches to the receiver
125+
for dispatch in dispatches:
126+
await self._send_running_state_change(dispatch)
127+
128+
return receiver
129+
130+
# pylint: enable=redefined-builtin
131+
132+
def start(self) -> None:
133+
"""Start the background service."""
134+
self._tasks.add(asyncio.create_task(self._run()))
135+
87136
async def _run(self) -> None:
88-
"""Run the actor."""
89-
_logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)
137+
"""Run the background service."""
138+
_logger.info(
139+
"Starting dispatching background service for microgrid %s",
140+
self._microgrid_id,
141+
)
90142

91143
# Initial fetch
92144
await self._fetch()
@@ -111,24 +163,18 @@ async def _run(self) -> None:
111163
case Event.CREATED:
112164
self._dispatches[dispatch.id] = dispatch
113165
await self._update_dispatch_schedule_and_notify(dispatch, None)
114-
await self._lifecycle_updates_sender.send(
115-
Created(dispatch=dispatch)
116-
)
166+
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
117167
case Event.UPDATED:
118168
await self._update_dispatch_schedule_and_notify(
119169
dispatch, self._dispatches[dispatch.id]
120170
)
121171
self._dispatches[dispatch.id] = dispatch
122-
await self._lifecycle_updates_sender.send(
123-
Updated(dispatch=dispatch)
124-
)
172+
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
125173
case Event.DELETED:
126174
self._dispatches.pop(dispatch.id)
127175
await self._update_dispatch_schedule_and_notify(None, dispatch)
128176

129-
await self._lifecycle_updates_sender.send(
130-
Deleted(dispatch=dispatch)
131-
)
177+
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
132178

133179
async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:
134180
"""Execute a scheduled event.
@@ -170,17 +216,13 @@ async def _fetch(self) -> None:
170216
if not old_dispatch:
171217
_logger.info("New dispatch: %s", dispatch)
172218
await self._update_dispatch_schedule_and_notify(dispatch, None)
173-
await self._lifecycle_updates_sender.send(
174-
Created(dispatch=dispatch)
175-
)
219+
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
176220
elif dispatch.update_time != old_dispatch.update_time:
177221
_logger.info("Updated dispatch: %s", dispatch)
178222
await self._update_dispatch_schedule_and_notify(
179223
dispatch, old_dispatch
180224
)
181-
await self._lifecycle_updates_sender.send(
182-
Updated(dispatch=dispatch)
183-
)
225+
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))
184226

185227
except grpc.aio.AioRpcError as error:
186228
_logger.error("Error fetching dispatches: %s", error)
@@ -189,13 +231,13 @@ async def _fetch(self) -> None:
189231

190232
for dispatch in old_dispatches.values():
191233
_logger.info("Deleted dispatch: %s", dispatch)
192-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
234+
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
193235
await self._update_dispatch_schedule_and_notify(None, dispatch)
194236

195237
# Set deleted only here as it influences the result of dispatch.started
196238
# which is used in above in _running_state_change
197239
dispatch._set_deleted() # pylint: disable=protected-access
198-
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
240+
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
199241

200242
async def _update_dispatch_schedule_and_notify(
201243
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None
@@ -359,4 +401,4 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None:
359401
Args:
360402
dispatch: The dispatch that changed.
361403
"""
362-
await self._running_state_change_sender.send(dispatch)
404+
await self._running_state_status_tx.send(dispatch)

0 commit comments

Comments
 (0)