Skip to content

Commit fa4ab8e

Browse files
committed
Use MicrogridId, DispatchId and implement DispatchActorId
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent bd03484 commit fa4ab8e

File tree

6 files changed

+79
-42
lines changed

6 files changed

+79
-42
lines changed

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
from frequenz.channels import Broadcast, Receiver, Sender, select
1414
from frequenz.channels.timer import SkipMissedAndDrift, Timer
1515
from frequenz.client.common.microgrid.components import ComponentCategory, ComponentId
16+
from frequenz.client.dispatch.types import DispatchId
1617
from frequenz.client.dispatch.types import TargetComponents as ClientTargetComponents
18+
from frequenz.core.id import BaseId
1719
from frequenz.sdk.actor import Actor, BackgroundService
1820

1921
from ._dispatch import Dispatch
@@ -27,6 +29,18 @@
2729
"""
2830

2931

32+
class DispatchActorId(BaseId, str_prefix="DA"):
33+
"""ID for a dispatch actor."""
34+
35+
def __init__(self, dispatch_id: DispatchId | int) -> None:
36+
"""Initialize the DispatchActorId.
37+
38+
Args:
39+
dispatch_id: The ID of the dispatch this actor is associated with.
40+
"""
41+
super().__init__(int(dispatch_id))
42+
43+
3044
@dataclass(frozen=True, kw_only=True)
3145
class DispatchInfo:
3246
"""Event emitted when the dispatch changes."""
@@ -162,7 +176,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
162176
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
163177
],
164178
running_status_receiver: Receiver[Dispatch],
165-
dispatch_identity: Callable[[Dispatch], int] | None = None,
179+
dispatch_identity: Callable[[Dispatch], DispatchActorId] | None = None,
166180
retry_interval: timedelta = timedelta(seconds=60),
167181
) -> None:
168182
"""Initialize the dispatch handler.
@@ -176,15 +190,15 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
176190
retry_interval: How long to wait until trying to start failed actors again.
177191
"""
178192
super().__init__()
179-
self._dispatch_identity: Callable[[Dispatch], int] = (
180-
dispatch_identity if dispatch_identity else lambda d: d.id
193+
self._dispatch_identity: Callable[[Dispatch], DispatchActorId] = (
194+
dispatch_identity if dispatch_identity else lambda d: DispatchActorId(d.id)
181195
)
182196

183197
self._dispatch_rx = running_status_receiver
184198
self._retry_timer_rx = Timer(retry_interval, SkipMissedAndDrift())
185199
self._actor_factory = actor_factory
186-
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}
187-
self._failed_dispatches: dict[int, Dispatch] = {}
200+
self._actors: dict[DispatchActorId, ActorDispatcher.ActorAndChannel] = {}
201+
self._failed_dispatches: dict[DispatchActorId, Dispatch] = {}
188202
"""Failed dispatches that will be retried later."""
189203

190204
def start(self) -> None:

src/frequenz/dispatch/_bg_service.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323
StreamRetrying,
2424
StreamStarted,
2525
)
26+
from frequenz.client.common.microgrid import MicrogridId
2627
from frequenz.client.dispatch import DispatchApiClient
2728
from frequenz.client.dispatch.types import DispatchEvent as ApiDispatchEvent
28-
from frequenz.client.dispatch.types import Event
29+
from frequenz.client.dispatch.types import DispatchId, Event
2930
from frequenz.sdk.actor import BackgroundService
3031

32+
from ._actor_dispatcher import DispatchActorId
3133
from ._dispatch import Dispatch
3234
from ._event import Created, Deleted, DispatchEvent, Updated
3335

@@ -39,11 +41,13 @@ class MergeStrategy(ABC):
3941
"""Base class for strategies to merge running intervals."""
4042

4143
@abstractmethod
42-
def identity(self, dispatch: Dispatch) -> int:
44+
def identity(self, dispatch: Dispatch) -> DispatchActorId:
4345
"""Identity function for the merge criteria."""
4446

4547
@abstractmethod
46-
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
48+
def filter(
49+
self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch
50+
) -> bool:
4751
"""Filter dispatches based on the strategy.
4852
4953
Args:
@@ -81,7 +85,7 @@ class QueueItem:
8185
to consider the start event when deciding whether to execute the
8286
stop event.
8387
"""
84-
dispatch_id: int
88+
dispatch_id: DispatchId
8589
dispatch: Dispatch = field(compare=False)
8690

8791
def __init__(
@@ -96,7 +100,7 @@ def __init__(
96100
# pylint: disable=too-many-arguments
97101
def __init__(
98102
self,
99-
microgrid_id: int,
103+
microgrid_id: MicrogridId,
100104
client: DispatchApiClient,
101105
) -> None:
102106
"""Initialize the background service.
@@ -108,7 +112,7 @@ def __init__(
108112
super().__init__(name="dispatch")
109113

110114
self._client = client
111-
self._dispatches: dict[int, Dispatch] = {}
115+
self._dispatches: dict[DispatchId, Dispatch] = {}
112116
self._microgrid_id = microgrid_id
113117

114118
self._lifecycle_events_channel = Broadcast[DispatchEvent](

src/frequenz/dispatch/_dispatcher.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
from typing import Awaitable, Callable, Self
1313

1414
from frequenz.channels import Receiver
15+
from frequenz.client.common.microgrid import MicrogridId
1516
from frequenz.client.dispatch import DispatchApiClient
1617
from frequenz.sdk.actor import Actor, BackgroundService
1718
from typing_extensions import override
1819

19-
from ._actor_dispatcher import ActorDispatcher, DispatchInfo
20+
from ._actor_dispatcher import ActorDispatcher, DispatchActorId, DispatchInfo
2021
from ._bg_service import DispatchScheduler, MergeStrategy
2122
from ._dispatch import Dispatch
2223
from ._event import DispatchEvent
@@ -202,7 +203,7 @@ async def run():
202203
def __init__(
203204
self,
204205
*,
205-
microgrid_id: int,
206+
microgrid_id: MicrogridId,
206207
server_url: str,
207208
key: str,
208209
call_timeout: timedelta = timedelta(seconds=60),
@@ -328,8 +329,8 @@ async def start_managing(
328329

329330
self._empty_event.clear()
330331

331-
def id_identity(dispatch: Dispatch) -> int:
332-
return dispatch.id
332+
def id_identity(dispatch: Dispatch) -> DispatchActorId:
333+
return DispatchActorId(dispatch.id)
333334

334335
dispatcher = ActorDispatcher(
335336
actor_factory=actor_factory,

src/frequenz/dispatch/_merge_strategies.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,34 @@
55

66
import logging
77
from collections.abc import Mapping
8+
from sys import maxsize
9+
from typing import Any
810

11+
from frequenz.client.dispatch.types import DispatchId
912
from typing_extensions import override
1013

14+
from ._actor_dispatcher import DispatchActorId
1115
from ._bg_service import MergeStrategy
1216
from ._dispatch import Dispatch
1317

1418

19+
def _hash_positive(args: Any) -> int:
20+
"""Make a positive hash."""
21+
return hash(args) + maxsize + 1
22+
23+
1524
class MergeByType(MergeStrategy):
1625
"""Merge running intervals based on the dispatch type."""
1726

1827
@override
19-
def identity(self, dispatch: Dispatch) -> int:
28+
def identity(self, dispatch: Dispatch) -> DispatchActorId:
2029
"""Identity function for the merge criteria."""
21-
return hash(dispatch.type)
30+
return DispatchActorId(_hash_positive(dispatch.type))
2231

2332
@override
24-
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
33+
def filter(
34+
self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch
35+
) -> bool:
2536
"""Filter dispatches based on the merge strategy.
2637
2738
Keeps start events.
@@ -53,6 +64,6 @@ class MergeByTypeTarget(MergeByType):
5364
"""Merge running intervals based on the dispatch type and target."""
5465

5566
@override
56-
def identity(self, dispatch: Dispatch) -> int:
67+
def identity(self, dispatch: Dispatch) -> DispatchActorId:
5768
"""Identity function for the merge criteria."""
58-
return hash((dispatch.type, tuple(dispatch.target)))
69+
return DispatchActorId(_hash_positive((dispatch.type, tuple(dispatch.target))))

tests/test_frequenz_dispatch.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import pytest
1414
import time_machine
1515
from frequenz.channels import Receiver
16+
from frequenz.client.common.microgrid import MicrogridId
1617
from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule
1718
from frequenz.client.dispatch.test.client import FakeClient, to_create_params
1819
from frequenz.client.dispatch.test.generator import DispatchGenerator
@@ -67,14 +68,14 @@ class _TestEnv:
6768
"""The receiver for ready dispatches."""
6869
client: FakeClient
6970
"""The fake client for the actor."""
70-
microgrid_id: int
71+
microgrid_id: MicrogridId
7172
"""The microgrid id."""
7273

7374

7475
@fixture
7576
async def test_env() -> AsyncIterator[_TestEnv]:
7677
"""Return an actor test environment."""
77-
microgrid_id = randint(1, 100)
78+
microgrid_id = MicrogridId(randint(1, 100))
7879
client = FakeClient()
7980

8081
service = DispatchScheduler(
@@ -475,11 +476,10 @@ async def test_dispatch_new_but_finished(
475476
)
476477
fake_time.shift(timedelta(seconds=1))
477478

479+
await asyncio.sleep(1)
478480
# Process the lifecycle event caused by the old dispatch at startup
479481
await test_env.lifecycle_events.receive()
480482

481-
await asyncio.sleep(1)
482-
483483
# Create another dispatch the normal way
484484
new_dispatch = generator.generate_dispatch()
485485
new_dispatch = replace(
@@ -551,7 +551,7 @@ async def test_multiple_dispatches_merge_running_intervals(
551551
merge_strategy: MergeStrategy,
552552
) -> None:
553553
"""Test that multiple dispatches are merged into a single running interval."""
554-
microgrid_id = randint(1, 100)
554+
microgrid_id = MicrogridId(randint(1, 100))
555555
client = FakeClient()
556556
service = DispatchScheduler(
557557
microgrid_id=microgrid_id,
@@ -633,7 +633,7 @@ async def test_multiple_dispatches_sequential_intervals_merge(
633633
Even if dispatches don't overlap but are consecutive,
634634
merge_running_intervals=TPYE should treat them as continuous if any event tries to stop.
635635
"""
636-
microgrid_id = randint(1, 100)
636+
microgrid_id = MicrogridId(randint(1, 100))
637637
client = FakeClient()
638638
service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
639639
service.start()
@@ -699,7 +699,7 @@ async def test_at_least_one_running_filter(
699699
merge_strategy: MergeStrategy,
700700
) -> None:
701701
"""Test scenarios directly tied to the _at_least_one_running logic."""
702-
microgrid_id = randint(1, 100)
702+
microgrid_id = MicrogridId(randint(1, 100))
703703
client = FakeClient()
704704
service = DispatchScheduler(microgrid_id=microgrid_id, client=client)
705705
service.start()

0 commit comments

Comments
 (0)