Skip to content

Commit 44e1fcf

Browse files
committed
Wait for dispatches to be fetched before creating a receiver
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 85057ff commit 44e1fcf

File tree

3 files changed

+44
-4
lines changed

3 files changed

+44
-4
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ This release introduces a more flexible and powerful mechanism for managing disp
99
* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
1010
* Two properties have been replaced by methods that require a type as parameter.
1111
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
12-
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`.
12+
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy, initial_fetch_timeout: timedelta)`.
1313
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
1414
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
1515
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
@@ -19,6 +19,7 @@ This release introduces a more flexible and powerful mechanism for managing disp
1919

2020
## New Features
2121

22+
* A new parameter `initial_fetch_timeout` has been added to `Dispatcher.new_running_state_event_receiver` and related methods. It allows you to specify a timeout for the initial fetch of dispatches. After the timeout, startup continues as normal.
2223
* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
2324
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
2425
* Actor management with dispatches has been simplified:

src/frequenz/dispatch/_bg_service.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ def __init__(
129129
always at index 0.
130130
"""
131131

132+
self._initial_fetch_event = asyncio.Event()
133+
"""The initial fetch event."""
134+
132135
# pylint: disable=redefined-builtin
133136
def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
134137
"""Create a new receiver for lifecycle events.
@@ -144,7 +147,11 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]:
144147
)
145148

146149
async def new_running_state_event_receiver(
147-
self, type: str, *, merge_strategy: MergeStrategy | None = None
150+
self,
151+
type: str,
152+
*,
153+
merge_strategy: MergeStrategy | None = None,
154+
initial_fetch_timeout: timedelta = timedelta(seconds=10),
148155
) -> Receiver[Dispatch]:
149156
"""Create a new receiver for running state events of the specified type.
150157
@@ -168,12 +175,25 @@ async def new_running_state_event_receiver(
168175
While merging, stop events are ignored as long as at least one
169176
merge-criteria-matching dispatch remains active.
170177
178+
Will wait up to `initial_fetch_timeout` for the initial fetch to
179+
complete to ensure that all dispatches are available but to also avoid
180+
waiting indefinitely.
181+
171182
Args:
172183
type: The type of events to receive.
173184
merge_strategy: The merge strategy to use.
185+
initial_fetch_timeout: The timeout for the initial fetch.
186+
174187
Returns:
175188
A new receiver for running state status.
176189
"""
190+
191+
# Wait with timeout for the initial fetch to complete
192+
await asyncio.wait_for(
193+
self._initial_fetch_event.wait(),
194+
timeout=initial_fetch_timeout.total_seconds(),
195+
)
196+
177197
# Find all matching dispatches based on the type and collect them
178198
dispatches = [
179199
dispatch for dispatch in self._dispatches.values() if dispatch.type == type
@@ -272,6 +292,8 @@ async def _fetch(self) -> None:
272292
This is used for the initial fetch and for re-fetching all dispatches
273293
if the connection was lost.
274294
"""
295+
self._initial_fetch_event.clear()
296+
275297
old_dispatches = self._dispatches
276298
self._dispatches = {}
277299

@@ -311,6 +333,8 @@ async def _fetch(self) -> None:
311333
dispatch._set_deleted() # pylint: disable=protected-access
312334
await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch))
313335

336+
self._initial_fetch_event.set()
337+
314338
async def _update_dispatch_schedule_and_notify(
315339
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None
316340
) -> None:

src/frequenz/dispatch/_dispatcher.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import logging
1010
from asyncio import Event
11+
from datetime import timedelta
1112
from typing import Awaitable, Callable
1213

1314
from frequenz.channels import Receiver
@@ -240,6 +241,7 @@ async def start_dispatching(
240241
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
241242
],
242243
merge_strategy: MergeStrategy | None = None,
244+
initial_fetch_timeout: timedelta = timedelta(seconds=10),
243245
) -> None:
244246
"""Manage actors for a given dispatch type.
245247
@@ -254,6 +256,10 @@ async def start_dispatching(
254256
dispatch_type: The type of the dispatch to manage.
255257
actor_factory: The factory to create actors.
256258
merge_strategy: The strategy to merge running intervals.
259+
initial_fetch_timeout: Timeout for the initial fetch of dispatches.
260+
After the timeout, the receiver will continue to start and
261+
listen for new dispatches, while the initial fetch is still
262+
running in the background.
257263
"""
258264
dispatcher = self._actor_dispatchers.get(dispatch_type)
259265

@@ -271,7 +277,9 @@ def id_identity(dispatch: Dispatch) -> int:
271277
dispatcher = ActorDispatcher(
272278
actor_factory=actor_factory,
273279
running_status_receiver=await self.new_running_state_event_receiver(
274-
dispatch_type, merge_strategy=merge_strategy
280+
dispatch_type,
281+
merge_strategy=merge_strategy,
282+
initial_fetch_timeout=initial_fetch_timeout,
275283
),
276284
dispatch_identity=(
277285
id_identity if merge_strategy is None else merge_strategy.identity
@@ -317,6 +325,7 @@ async def new_running_state_event_receiver(
317325
dispatch_type: str,
318326
*,
319327
merge_strategy: MergeStrategy | None = None,
328+
initial_fetch_timeout: timedelta = timedelta(seconds=10),
320329
) -> Receiver[Dispatch]:
321330
"""Return running state event receiver.
322331
@@ -363,10 +372,16 @@ async def new_running_state_event_receiver(
363372
Args:
364373
dispatch_type: The type of the dispatch to listen for.
365374
merge_strategy: The type of the strategy to merge running intervals.
375+
initial_fetch_timeout: Timeout for the initial fetch of dispatches.
376+
After the timeout, the receiver will continue to start and
377+
listen for new dispatches, while the initial fetch is still
378+
running in the background
366379
367380
Returns:
368381
A new receiver for dispatches whose running status changed.
369382
"""
370383
return await self._bg_service.new_running_state_event_receiver(
371-
dispatch_type, merge_strategy=merge_strategy
384+
dispatch_type,
385+
merge_strategy=merge_strategy,
386+
initial_fetch_timeout=initial_fetch_timeout,
372387
)

0 commit comments

Comments
 (0)