Skip to content

Commit c7ee3cc

Browse files
authored
Fetch dispatches in sync with periodic reconnect of client.stream (#155)
Should probably use frequenz-floss/frequenz-client-base-python#141 eventually to be less hacky?
2 parents a3332ea + fa4ab8e commit c7ee3cc

File tree

8 files changed

+179
-92
lines changed

8 files changed

+179
-92
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* The dispatcher offers two new parameters to control the client's call and stream timeout:
1414
- `call_timeout`: The maximum time to wait for a response from the client.
1515
- `stream_timeout`: The maximum time to wait before restarting a stream.
16+
* While the dispatch stream restarts we refresh our dispatch cache as well, to ensure we didn't miss any updates.
1617

1718
## Bug Fixes
1819

pyproject.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ dependencies = [
3838
# Make sure to update the version for cross-referencing also in the
3939
# mkdocs.yml file when changing the version here (look for the config key
4040
# plugins.mkdocstrings.handlers.python.import)
41-
"frequenz-sdk >= 1.0.0-rc1900, < 1.0.0-rc2100",
41+
"frequenz-sdk >= 1.0.0-rc2002, < 1.0.0-rc2100",
4242
"frequenz-channels >= 1.6.1, < 2.0.0",
43-
"frequenz-client-dispatch >= 0.10.2, < 0.11.0",
43+
"frequenz-client-dispatch >= 0.11.0, < 0.12.0",
44+
"frequenz-client-common >= 0.3.2, < 0.4.0",
45+
"frequenz-client-base >= 0.11.0, < 0.12.0",
4446
]
4547
dynamic = ["version"]
4648

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
from frequenz.channels import Broadcast, Receiver, Sender, select
1414
from frequenz.channels.timer import SkipMissedAndDrift, Timer
1515
from frequenz.client.common.microgrid.components import ComponentCategory, ComponentId
16+
from frequenz.client.dispatch.types import DispatchId
17+
from frequenz.client.dispatch.types import TargetComponents as ClientTargetComponents
18+
from frequenz.core.id import BaseId
1619
from frequenz.sdk.actor import Actor, BackgroundService
1720

1821
from ._dispatch import Dispatch
@@ -26,6 +29,18 @@
2629
"""
2730

2831

32+
class DispatchActorId(BaseId, str_prefix="DA"):
33+
"""ID for a dispatch actor."""
34+
35+
def __init__(self, dispatch_id: DispatchId | int) -> None:
36+
"""Initialize the DispatchActorId.
37+
38+
Args:
39+
dispatch_id: The ID of the dispatch this actor is associated with.
40+
"""
41+
super().__init__(int(dispatch_id))
42+
43+
2944
@dataclass(frozen=True, kw_only=True)
3045
class DispatchInfo:
3146
"""Event emitted when the dispatch changes."""
@@ -161,7 +176,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
161176
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
162177
],
163178
running_status_receiver: Receiver[Dispatch],
164-
dispatch_identity: Callable[[Dispatch], int] | None = None,
179+
dispatch_identity: Callable[[Dispatch], DispatchActorId] | None = None,
165180
retry_interval: timedelta = timedelta(seconds=60),
166181
) -> None:
167182
"""Initialize the dispatch handler.
@@ -175,36 +190,25 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
175190
retry_interval: How long to wait until trying to start failed actors again.
176191
"""
177192
super().__init__()
178-
self._dispatch_identity: Callable[[Dispatch], int] = (
179-
dispatch_identity if dispatch_identity else lambda d: d.id
193+
self._dispatch_identity: Callable[[Dispatch], DispatchActorId] = (
194+
dispatch_identity if dispatch_identity else lambda d: DispatchActorId(d.id)
180195
)
181196

182197
self._dispatch_rx = running_status_receiver
183198
self._retry_timer_rx = Timer(retry_interval, SkipMissedAndDrift())
184199
self._actor_factory = actor_factory
185-
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}
186-
self._failed_dispatches: dict[int, Dispatch] = {}
200+
self._actors: dict[DispatchActorId, ActorDispatcher.ActorAndChannel] = {}
201+
self._failed_dispatches: dict[DispatchActorId, Dispatch] = {}
187202
"""Failed dispatches that will be retried later."""
188203

189204
def start(self) -> None:
190205
"""Start the background service."""
191206
self._tasks.add(asyncio.create_task(self._run()))
192207

193-
def _get_target_components_from_dispatch(
194-
self, dispatch: Dispatch
195-
) -> TargetComponents:
196-
if all(isinstance(comp, int) for comp in dispatch.target):
197-
# We've confirmed all elements are integers, so we can cast.
198-
int_components = cast(list[int], dispatch.target)
199-
return [ComponentId(cid) for cid in int_components]
200-
# If not all are ints, then it must be a list of ComponentCategory
201-
# based on the definition of ClientTargetComponents.
202-
return cast(list[ComponentCategory], dispatch.target)
203-
204208
async def _start_actor(self, dispatch: Dispatch) -> None:
205209
"""Start the actor the given dispatch refers to."""
206210
dispatch_update = DispatchInfo(
207-
components=self._get_target_components_from_dispatch(dispatch),
211+
components=_convert_target_components(dispatch.target),
208212
dry_run=dispatch.dry_run,
209213
options=dispatch.payload,
210214
_src=dispatch,
@@ -301,3 +305,13 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
301305
await self._start_actor(dispatch)
302306
else:
303307
await self._stop_actor(dispatch, "Dispatch stopped")
308+
309+
310+
def _convert_target_components(target: ClientTargetComponents) -> TargetComponents:
311+
if all(isinstance(comp, int) for comp in target):
312+
# We've confirmed all elements are integers, so we can cast.
313+
int_components = cast(list[int], target)
314+
return [ComponentId(cid) for cid in int_components]
315+
# If not all are ints, then it must be a list of ComponentCategory
316+
# based on the definition of ClientTargetComponents.
317+
return cast(list[ComponentCategory], target)

src/frequenz/dispatch/_bg_service.py

Lines changed: 77 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,18 @@
1818
import grpc.aio
1919
from frequenz.channels import Broadcast, Receiver, select, selected_from
2020
from frequenz.channels.timer import SkipMissedAndResync, Timer
21+
from frequenz.client.base.streaming import (
22+
StreamFatalError,
23+
StreamRetrying,
24+
StreamStarted,
25+
)
26+
from frequenz.client.common.microgrid import MicrogridId
2127
from frequenz.client.dispatch import DispatchApiClient
22-
from frequenz.client.dispatch.types import Event
28+
from frequenz.client.dispatch.types import DispatchEvent as ApiDispatchEvent
29+
from frequenz.client.dispatch.types import DispatchId, Event
2330
from frequenz.sdk.actor import BackgroundService
2431

32+
from ._actor_dispatcher import DispatchActorId
2533
from ._dispatch import Dispatch
2634
from ._event import Created, Deleted, DispatchEvent, Updated
2735

@@ -33,11 +41,13 @@ class MergeStrategy(ABC):
3341
"""Base class for strategies to merge running intervals."""
3442

3543
@abstractmethod
36-
def identity(self, dispatch: Dispatch) -> int:
44+
def identity(self, dispatch: Dispatch) -> DispatchActorId:
3745
"""Identity function for the merge criteria."""
3846

3947
@abstractmethod
40-
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
48+
def filter(
49+
self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch
50+
) -> bool:
4151
"""Filter dispatches based on the strategy.
4252
4353
Args:
@@ -75,7 +85,7 @@ class QueueItem:
7585
to consider the start event when deciding whether to execute the
7686
stop event.
7787
"""
78-
dispatch_id: int
88+
dispatch_id: DispatchId
7989
dispatch: Dispatch = field(compare=False)
8090

8191
def __init__(
@@ -90,7 +100,7 @@ def __init__(
90100
# pylint: disable=too-many-arguments
91101
def __init__(
92102
self,
93-
microgrid_id: int,
103+
microgrid_id: MicrogridId,
94104
client: DispatchApiClient,
95105
) -> None:
96106
"""Initialize the background service.
@@ -102,7 +112,7 @@ def __init__(
102112
super().__init__(name="dispatch")
103113

104114
self._client = client
105-
self._dispatches: dict[int, Dispatch] = {}
115+
self._dispatches: dict[DispatchId, Dispatch] = {}
106116
self._microgrid_id = microgrid_id
107117

108118
self._lifecycle_events_channel = Broadcast[DispatchEvent](
@@ -230,8 +240,21 @@ async def _run(self) -> None:
230240
) as next_event_timer:
231241
# Initial fetch
232242
await self._fetch(next_event_timer)
233-
stream = self._client.stream(microgrid_id=self._microgrid_id)
234243

244+
# pylint: disable-next=protected-access
245+
streamer = self._client._get_stream(microgrid_id=self._microgrid_id)
246+
stream = streamer.new_receiver(include_events=True)
247+
248+
# We track stream start events linked to retries to avoid re-fetching
249+
# dispatches that were already retrieved during an initial stream start.
250+
# The initial fetch gets all dispatches, and the StreamStarted event
251+
# isn't always reliable due to parallel receiver creation and stream
252+
# task initiation.
253+
# This way we get a deterministic behavior where we only fetch
254+
# dispatches once initially and then only when the stream is restarted.
255+
is_retry_attempt = False
256+
257+
# Streaming updates
235258
async for selected in select(next_event_timer, stream):
236259
if selected_from(selected, next_event_timer):
237260
if not self._scheduled_events:
@@ -240,36 +263,54 @@ async def _run(self) -> None:
240263
heappop(self._scheduled_events).dispatch, next_event_timer
241264
)
242265
elif selected_from(selected, stream):
243-
_logger.debug("Received dispatch event: %s", selected.message)
244-
dispatch = Dispatch(selected.message.dispatch)
245-
match selected.message.event:
246-
case Event.CREATED:
247-
self._dispatches[dispatch.id] = dispatch
248-
await self._update_dispatch_schedule_and_notify(
249-
dispatch, None, next_event_timer
250-
)
251-
await self._lifecycle_events_tx.send(
252-
Created(dispatch=dispatch)
253-
)
254-
case Event.UPDATED:
255-
await self._update_dispatch_schedule_and_notify(
256-
dispatch,
257-
self._dispatches[dispatch.id],
258-
next_event_timer,
259-
)
260-
self._dispatches[dispatch.id] = dispatch
261-
await self._lifecycle_events_tx.send(
262-
Updated(dispatch=dispatch)
263-
)
264-
case Event.DELETED:
265-
self._dispatches.pop(dispatch.id)
266-
await self._update_dispatch_schedule_and_notify(
267-
None, dispatch, next_event_timer
268-
)
269-
270-
await self._lifecycle_events_tx.send(
271-
Deleted(dispatch=dispatch)
266+
match selected.message:
267+
case ApiDispatchEvent():
268+
_logger.debug(
269+
"Received dispatch event: %s", selected.message
272270
)
271+
dispatch = Dispatch(selected.message.dispatch)
272+
match selected.message.event:
273+
case Event.CREATED:
274+
self._dispatches[dispatch.id] = dispatch
275+
await self._update_dispatch_schedule_and_notify(
276+
dispatch, None, next_event_timer
277+
)
278+
await self._lifecycle_events_tx.send(
279+
Created(dispatch=dispatch)
280+
)
281+
case Event.UPDATED:
282+
await self._update_dispatch_schedule_and_notify(
283+
dispatch,
284+
self._dispatches[dispatch.id],
285+
next_event_timer,
286+
)
287+
self._dispatches[dispatch.id] = dispatch
288+
await self._lifecycle_events_tx.send(
289+
Updated(dispatch=dispatch)
290+
)
291+
case Event.DELETED:
292+
self._dispatches.pop(dispatch.id)
293+
await self._update_dispatch_schedule_and_notify(
294+
None, dispatch, next_event_timer
295+
)
296+
297+
await self._lifecycle_events_tx.send(
298+
Deleted(dispatch=dispatch)
299+
)
300+
301+
case StreamRetrying():
302+
is_retry_attempt = True
303+
304+
case StreamStarted():
305+
if is_retry_attempt:
306+
_logger.info(
307+
"Dispatch stream restarted, getting dispatches"
308+
)
309+
await self._fetch(next_event_timer)
310+
is_retry_attempt = False
311+
312+
case StreamFatalError():
313+
pass
273314

274315
async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None:
275316
"""Execute a scheduled event.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
from typing import Awaitable, Callable, Self
1313

1414
from frequenz.channels import Receiver
15+
from frequenz.client.common.microgrid import MicrogridId
1516
from frequenz.client.dispatch import DispatchApiClient
1617
from frequenz.sdk.actor import Actor, BackgroundService
1718
from typing_extensions import override
1819

19-
from ._actor_dispatcher import ActorDispatcher, DispatchInfo
20+
from ._actor_dispatcher import ActorDispatcher, DispatchActorId, DispatchInfo
2021
from ._bg_service import DispatchScheduler, MergeStrategy
2122
from ._dispatch import Dispatch
2223
from ._event import DispatchEvent
@@ -202,7 +203,7 @@ async def run():
202203
def __init__(
203204
self,
204205
*,
205-
microgrid_id: int,
206+
microgrid_id: MicrogridId,
206207
server_url: str,
207208
key: str,
208209
call_timeout: timedelta = timedelta(seconds=60),
@@ -328,8 +329,8 @@ async def start_managing(
328329

329330
self._empty_event.clear()
330331

331-
def id_identity(dispatch: Dispatch) -> int:
332-
return dispatch.id
332+
def id_identity(dispatch: Dispatch) -> DispatchActorId:
333+
return DispatchActorId(dispatch.id)
333334

334335
dispatcher = ActorDispatcher(
335336
actor_factory=actor_factory,

src/frequenz/dispatch/_merge_strategies.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,34 @@
55

66
import logging
77
from collections.abc import Mapping
8+
from sys import maxsize
9+
from typing import Any
810

11+
from frequenz.client.dispatch.types import DispatchId
912
from typing_extensions import override
1013

14+
from ._actor_dispatcher import DispatchActorId
1115
from ._bg_service import MergeStrategy
1216
from ._dispatch import Dispatch
1317

1418

19+
def _hash_positive(args: Any) -> int:
20+
"""Make a positive hash."""
21+
return hash(args) + maxsize + 1
22+
23+
1524
class MergeByType(MergeStrategy):
1625
"""Merge running intervals based on the dispatch type."""
1726

1827
@override
19-
def identity(self, dispatch: Dispatch) -> int:
28+
def identity(self, dispatch: Dispatch) -> DispatchActorId:
2029
"""Identity function for the merge criteria."""
21-
return hash(dispatch.type)
30+
return DispatchActorId(_hash_positive(dispatch.type))
2231

2332
@override
24-
def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool:
33+
def filter(
34+
self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch
35+
) -> bool:
2536
"""Filter dispatches based on the merge strategy.
2637
2738
Keeps start events.
@@ -53,6 +64,6 @@ class MergeByTypeTarget(MergeByType):
5364
"""Merge running intervals based on the dispatch type and target."""
5465

5566
@override
56-
def identity(self, dispatch: Dispatch) -> int:
67+
def identity(self, dispatch: Dispatch) -> DispatchActorId:
5768
"""Identity function for the merge criteria."""
58-
return hash((dispatch.type, tuple(dispatch.target)))
69+
return DispatchActorId(_hash_positive((dispatch.type, tuple(dispatch.target))))

0 commit comments

Comments
 (0)