diff --git a/README.md b/README.md index 2e9ac8a..c253516 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,13 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth ```python import os -from frequenz.dispatch import Dispatcher from unittest.mock import MagicMock +from datetime import timedelta + +from frequenz.dispatch import Dispatcher, DispatchInfo, MergeByType + +async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor: + return MagicMock(dispatch=dispatch, receiver=receiver) async def run(): url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") @@ -30,38 +35,19 @@ async def run(): microgrid_id = 1 - dispatcher = Dispatcher( + async with Dispatcher( microgrid_id=microgrid_id, server_url=url, - key=key - ) - await dispatcher.start() - - actor = MagicMock() # replace with your actor - - changed_running_status_rx = dispatcher.new_running_state_event_receiver("MY_TYPE") - - async for dispatch in changed_running_status_rx: - if dispatch.started: - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - if actor.is_running: - actor.reconfigure( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) # this will reconfigure the actor - else: - # this will start a new actor with the given components - # and run it for the duration of the dispatch - actor.start( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) - else: - actor.stop() # this will stop the actor + key=key, + ) as dispatcher: + await dispatcher.start_managing( + dispatch_type="EXAMPLE_TYPE", + actor_factory=create_actor, + merge_strategy=MergeByType(), + retry_interval=timedelta(seconds=10) + ) + + await dispatcher ``` ## Supported Platforms diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index dae035d..fed3e2a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,11 +6,50 @@ This release introduces a more flexible and powerful mechanism for managing disp ## Upgrading +A new simplified way to manage actors has been introduced: + +Change your code from: +```python +dispatcher = Dispatcher( + microgrid_id=microgrid_id, + server_url=url, + key=key +) +dispatcher.start() + +status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") + +managing_actor = ActorDispatcher( + actor_factory=MyActor.new_with_dispatch, + running_status_receiver=status_receiver, +) + +await run(managing_actor) +``` + +to + +```python +async with Dispatcher( + microgrid_id=microgrid_id, + server_url=url, + key=key +) as dispatcher: + await dispatcher.start_managing( + dispatch_type="EXAMPLE_TYPE", + actor_factory=MyActor.new_with_dispatch, # now async factory! + merge_strategy=MergeByType, + ) + await dispatcher +``` + +Further changes: + * `Dispatcher.start` is no longer `async`. Remove `await` when calling it. * Two properties have been replaced by methods that require a type as parameter. * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`. * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`. -* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function. +* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new_receiver function. * The `DispatchManagingActor` class has been renamed to `DispatchActorsService`. * It's interface has been simplified and now only requires an actor factory and a running status receiver. * It only starts/stops a single actor at a time now instead of a set of actors. @@ -22,4 +61,10 @@ This release introduces a more flexible and powerful mechanism for managing disp * A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. * The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1800. -* Actor management with dispatches has been simplified. Calling `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` will begin managing your actor for the given type and merge strategy. All you need provide is an actor factory. To stop dispatching for your type, call `Dispatcher.stop_dispatching(dispatch_type)`. +* Actor management with dispatches has been simplified: + * `Dispatcher.start_managing(dispatch_type, actor_factory, merge_strategy, retry_interval)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory. + * `Dispatcher.stop_managing(dispatch_type)` to stop dispatching for the given type. + * `Dispatcher.is_managed(dispatch_type)` to check if dispatching is active for the given type. + * Dispatches that failed to start will now be retried after a delay. +* A new method `Dispatcher.wait_for_initialization()` has been added to wait for all actors to be initialized. +* When using `async with Dispatcher(..) as dispatcher`, the dispatcher will first wait for the dispatch service to be initialized before entering the block. diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 9ae2b6d..8bb76a8 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -7,9 +7,10 @@ import logging from collections.abc import Callable from dataclasses import dataclass -from typing import Any +from datetime import timedelta +from typing import Any, Awaitable -from frequenz.channels import Broadcast, Receiver +from frequenz.channels import Broadcast, Receiver, select from frequenz.client.dispatch.types import TargetComponents from frequenz.sdk.actor import Actor, BackgroundService @@ -116,29 +117,77 @@ async def main(): microgrid_id = 1 - dispatcher = Dispatcher( + async with Dispatcher( microgrid_id=microgrid_id, server_url=url, key=key - ) - dispatcher.start() - - status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") + ) as dispatcher: + status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") - managing_actor = ActorDispatcher( - actor_factory=MyActor.new_with_dispatch, - running_status_receiver=status_receiver, - ) + managing_actor = ActorDispatcher( + actor_factory=MyActor.new_with_dispatch, + running_status_receiver=status_receiver, + ) - await run(managing_actor) + await run(managing_actor) ``` """ - def __init__( + class RetryFailedDispatches: + """Manages the retry of failed dispatches.""" + + def __init__(self, retry_interval: timedelta) -> None: + """Initialize the retry manager. + + Args: + retry_interval: The interval between retries. + """ + self._retry_interval = retry_interval + self._channel = Broadcast[Dispatch](name="retry_channel") + self._sender = self._channel.new_sender() + self._tasks: set[asyncio.Task[None]] = set() + + def new_receiver(self) -> Receiver[Dispatch]: + """Create a new receiver for dispatches to retry. + + Returns: + The receiver. + """ + return self._channel.new_receiver() + + def retry(self, dispatch: Dispatch) -> None: + """Retry a dispatch. + + Args: + dispatch: The dispatch information to retry. + """ + task = asyncio.create_task(self._retry_after_delay(dispatch)) + self._tasks.add(task) + task.add_done_callback(self._tasks.remove) + + async def _retry_after_delay(self, dispatch: Dispatch) -> None: + """Retry a dispatch after a delay. + + Args: + dispatch: The dispatch information to retry. + """ + _logger.info( + "Will retry dispatch %s after %s", + dispatch.id, + self._retry_interval, + ) + await asyncio.sleep(self._retry_interval.total_seconds()) + _logger.info("Retrying dispatch %s now", dispatch.id) + await self._sender.send(dispatch) + + def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments self, - actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], + actor_factory: Callable[ + [DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor] + ], running_status_receiver: Receiver[Dispatch], dispatch_identity: Callable[[Dispatch], int] | None = None, + retry_interval: timedelta | None = timedelta(seconds=60), ) -> None: """Initialize the dispatch handler. @@ -148,6 +197,7 @@ def __init__( running_status_receiver: The receiver for dispatch running status changes. dispatch_identity: A function to identify to which actor a dispatch refers. By default, it uses the dispatch ID. + retry_interval: The interval between retries. If `None`, retries are disabled. """ super().__init__() self._dispatch_identity: Callable[[Dispatch], int] = ( @@ -161,6 +211,11 @@ def __init__( name="dispatch_updates_channel", resend_latest=True ) self._updates_sender = self._updates_channel.new_sender() + self._retrier = ( + ActorDispatcher.RetryFailedDispatches(retry_interval) + if retry_interval + else None + ) def start(self) -> None: """Start the background service.""" @@ -174,7 +229,8 @@ async def _start_actor(self, dispatch: Dispatch) -> None: options=dispatch.payload, ) - actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch)) + identity = self._dispatch_identity(dispatch) + actor: Actor | None = self._actors.get(identity) if actor: sent_str = "" @@ -189,21 +245,28 @@ async def _start_actor(self, dispatch: Dispatch) -> None: else: try: _logger.info("Starting actor for dispatch type %r", dispatch.type) - actor = self._actor_factory( + actor = await self._actor_factory( dispatch_update, self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), ) - self._actors[self._dispatch_identity(dispatch)] = actor actor.start() except Exception as e: # pylint: disable=broad-except _logger.error( - "Failed to start actor for dispatch type %r: %s", + "Failed to start actor for dispatch type %r", dispatch.type, - e, - exc_info=True, + exc_info=e, ) + if self._retrier: + self._retrier.retry(dispatch) + else: + _logger.error( + "No retry mechanism enabled, dispatch %r failed", dispatch + ) + else: + # No exception occurred, so we can add the actor to the list + self._actors[identity] = actor async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """Stop all actors. @@ -212,17 +275,33 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: stopping_dispatch: The dispatch that is stopping the actor. msg: The message to be passed to the actors being stopped. """ - if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None): + actor: Actor | None = None + identity = self._dispatch_identity(stopping_dispatch) + + actor = self._actors.get(identity) + + if actor: await actor.stop(msg) + + del self._actors[identity] else: _logger.warning( "Actor for dispatch type %r is not running", stopping_dispatch.type ) async def _run(self) -> None: - """Wait for dispatches and handle them.""" - async for dispatch in self._dispatch_rx: - await self._handle_dispatch(dispatch=dispatch) + """Run the background service.""" + if not self._retrier: + async for dispatch in self._dispatch_rx: + await self._handle_dispatch(dispatch) + else: + retry_recv = self._retrier.new_receiver() + + async for selected in select(retry_recv, self._dispatch_rx): + if retry_recv.triggered(selected): + self._retrier.retry(selected.message) + elif self._dispatch_rx.triggered(selected): + await self._handle_dispatch(selected.message) async def _handle_dispatch(self, dispatch: Dispatch) -> None: """Handle a dispatch. diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index f5a0a84..af638b6 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -129,6 +129,13 @@ def __init__( always at index 0. """ + self._initial_fetch_event = asyncio.Event() + """The initial fetch event.""" + + async def wait_for_initialization(self) -> None: + """Wait for the initial fetch to complete.""" + await self._initial_fetch_event.wait() + # pylint: disable=redefined-builtin def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]: """Create a new receiver for lifecycle events. @@ -144,7 +151,10 @@ def new_lifecycle_events_receiver(self, type: str) -> Receiver[DispatchEvent]: ) async def new_running_state_event_receiver( - self, type: str, *, merge_strategy: MergeStrategy | None = None + self, + type: str, + *, + merge_strategy: MergeStrategy | None = None, ) -> Receiver[Dispatch]: """Create a new receiver for running state events of the specified type. @@ -171,9 +181,16 @@ async def new_running_state_event_receiver( Args: type: The type of events to receive. merge_strategy: The merge strategy to use. + Returns: A new receiver for running state status. + + Raises: + RuntimeError: If the dispatch service is not running. """ + if not self._tasks: + raise RuntimeError("Dispatch service not started") + # Find all matching dispatches based on the type and collect them dispatches = [ dispatch for dispatch in self._dispatches.values() if dispatch.type == type @@ -195,7 +212,8 @@ async def new_running_state_event_receiver( ) for dispatch in dispatches: - await self._send_running_state_change(dispatch) + if dispatch.started: + await self._send_running_state_change(dispatch) return receiver @@ -272,6 +290,8 @@ async def _fetch(self) -> None: This is used for the initial fetch and for re-fetching all dispatches if the connection was lost. """ + self._initial_fetch_event.clear() + old_dispatches = self._dispatches self._dispatches = {} @@ -311,6 +331,8 @@ async def _fetch(self) -> None: dispatch._set_deleted() # pylint: disable=protected-access await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) + self._initial_fetch_event.set() + async def _update_dispatch_schedule_and_notify( self, dispatch: Dispatch | None, old_dispatch: Dispatch | None ) -> None: diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 6397338..05be8d4 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -8,7 +8,8 @@ import asyncio import logging from asyncio import Event -from typing import Callable +from datetime import timedelta +from typing import Awaitable, Callable, Self from frequenz.channels import Receiver from frequenz.client.dispatch import Client @@ -27,7 +28,10 @@ class Dispatcher(BackgroundService): """A highlevel interface for the dispatch API. This class provides a highlevel interface to the dispatch API. - It provides two receiver functions: + It provides receivers for various events and management of actors based on + dispatches. + + The receivers shortly explained: * [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]: Receives an event whenever a dispatch is created, updated or deleted. @@ -41,6 +45,35 @@ class Dispatcher(BackgroundService): Any change that could potentially require the consumer to start, stop or reconfigure itself will cause a message to be sent. + Example: Managing an actor + ```python + import os + from frequenz.dispatch import Dispatcher, MergeByType + from unittest.mock import MagicMock + + async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor: + return MagicMock(dispatch=dispatch, receiver=receiver) + + async def run(): + url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") + key = os.getenv("DISPATCH_API_KEY", "some-key") + + microgrid_id = 1 + + async with Dispatcher( + microgrid_id=microgrid_id, + server_url=url, + key=key + ) as dispatcher: + dispatcher.start_managing( + dispatch_type="DISPATCH_TYPE", + actor_factory=create_actor, + merge_strategy=MergeByType(), + ) + + await dispatcher + ``` + Example: Processing running state change dispatches ```python import os @@ -53,38 +86,36 @@ async def run(): microgrid_id = 1 - dispatcher = Dispatcher( + async with Dispatcher( microgrid_id=microgrid_id, server_url=url, key=key - ) - await dispatcher.start() - - actor = MagicMock() # replace with your actor - - changed_running_status = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE") - - async for dispatch in changed_running_status: - if dispatch.started: - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - if actor.is_running: - actor.reconfigure( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) # this will reconfigure the actor + ) as dispatcher: + actor = MagicMock() # replace with your actor + + rs_receiver = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE") + + async for dispatch in rs_receiver: + if dispatch.started: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) else: - # this will start a new actor with the given components - # and run it for the duration of the dispatch - actor.start( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) - else: - actor.stop() # this will stop the actor + actor.stop() # this will stop the actor ``` Example: Getting notification about dispatch lifecycle events @@ -100,25 +131,23 @@ async def run(): microgrid_id = 1 - dispatcher = Dispatcher( + async with Dispatcher( microgrid_id=microgrid_id, server_url=url, - key=key - ) - await dispatcher.start() # this will start the actor - - events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE") - - async for event in events_receiver: - match event: - case Created(dispatch): - print(f"A dispatch was created: {dispatch}") - case Deleted(dispatch): - print(f"A dispatch was deleted: {dispatch}") - case Updated(dispatch): - print(f"A dispatch was updated: {dispatch}") - case _ as unhandled: - assert_never(unhandled) + key=key, + ) as dispatcher: + events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE") + + async for event in events_receiver: + match event: + case Created(dispatch): + print(f"A dispatch was created: {dispatch}") + case Deleted(dispatch): + print(f"A dispatch was deleted: {dispatch}") + case Updated(dispatch): + print(f"A dispatch was updated: {dispatch}") + case _ as unhandled: + assert_never(unhandled) ``` Example: Creating a new dispatch and then modifying it. @@ -138,35 +167,33 @@ async def run(): microgrid_id = 1 - dispatcher = Dispatcher( + async with Dispatcher( microgrid_id=microgrid_id, server_url=url, - key=key - ) - await dispatcher.start() # this will start the actor - - # Create a new dispatch - new_dispatch = await dispatcher.client.create( - microgrid_id=microgrid_id, - type="ECHO_FREQUENCY", # replace with your own type - start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10), - duration=timedelta(minutes=5), - target=ComponentCategory.INVERTER, - payload={"font": "Times New Roman"}, # Arbitrary payload data - ) - - # Modify the dispatch - await dispatcher.client.update( - microgrid_id=microgrid_id, - dispatch_id=new_dispatch.id, - new_fields={"duration": timedelta(minutes=10)} - ) - - # Validate the modification - modified_dispatch = await dispatcher.client.get( - microgrid_id=microgrid_id, dispatch_id=new_dispatch.id - ) - assert modified_dispatch.duration == timedelta(minutes=10) + key=key, + ) as dispatcher: + # Create a new dispatch + new_dispatch = await dispatcher.client.create( + microgrid_id=microgrid_id, + type="ECHO_FREQUENCY", # replace with your own type + start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10), + duration=timedelta(minutes=5), + target=ComponentCategory.INVERTER, + payload={"font": "Times New Roman"}, # Arbitrary payload data + ) + + # Modify the dispatch + await dispatcher.client.update( + microgrid_id=microgrid_id, + dispatch_id=new_dispatch.id, + new_fields={"duration": timedelta(minutes=10)} + ) + + # Validate the modification + modified_dispatch = await dispatcher.client.get( + microgrid_id=microgrid_id, dispatch_id=new_dispatch.id + ) + assert modified_dispatch.duration == timedelta(minutes=10) ``` """ @@ -221,26 +248,59 @@ def cancel(self, msg: str | None = None) -> None: for instance in self._actor_dispatchers.values(): instance.cancel() - async def start_dispatching( + async def wait_for_initialization(self) -> None: + """Wait until the background service is initialized.""" + await self._bg_service.wait_for_initialization() + + def is_managed(self, dispatch_type: str) -> bool: + """Check if the dispatcher is managing actors for a given dispatch type. + + Args: + dispatch_type: The type of the dispatch to check. + + Returns: + True if the dispatcher is managing actors for the given dispatch type. + """ + return dispatch_type in self._actor_dispatchers + + async def start_managing( self, dispatch_type: str, *, - actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], + actor_factory: Callable[ + [DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor] + ], merge_strategy: MergeStrategy | None = None, + retry_interval: timedelta = timedelta(seconds=60), ) -> None: """Manage actors for a given dispatch type. - Creates and manages an ActorDispatcher for the given type that will + Creates and manages an + [`ActorDispatcher`][frequenz.dispatch.ActorDispatcher] for the given type that will start, stop and reconfigure actors based on received dispatches. You can await the `Dispatcher` instance to block until all types - registered with `start_dispatching()` are stopped using + registered with `start_managing()` are stopped using `stop_dispatching()` + "Merging" means that when multiple dispatches are active at the same time, + the intervals are merged into one. + + This also decides how instances are mapped from dispatches to actors: + + * [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to + one single instance identified by the dispatch type. + * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A + dispatch maps to an instance identified by the dispatch type and target. + So different dispatches with equal type and target will map to the same + instance. + * `None` — No merging, each dispatch maps to a separate instance. + Args: dispatch_type: The type of the dispatch to manage. actor_factory: The factory to create actors. merge_strategy: The strategy to merge running intervals. + retry_interval: Retry interval for when actor creation fails. """ dispatcher = self._actor_dispatchers.get(dispatch_type) @@ -258,11 +318,13 @@ def id_identity(dispatch: Dispatch) -> int: dispatcher = ActorDispatcher( actor_factory=actor_factory, running_status_receiver=await self.new_running_state_event_receiver( - dispatch_type, merge_strategy=merge_strategy + dispatch_type, + merge_strategy=merge_strategy, ), dispatch_identity=( id_identity if merge_strategy is None else merge_strategy.identity ), + retry_interval=retry_interval, ) self._actor_dispatchers[dispatch_type] = dispatcher @@ -286,6 +348,19 @@ def client(self) -> Client: """Return the client.""" return self._client + @override + async def __aenter__(self) -> Self: + """Enter an async context. + + Start this background service. + + Returns: + This background service. + """ + await super().__aenter__() + await self.wait_for_initialization() + return self + def new_lifecycle_events_receiver( self, dispatch_type: str ) -> Receiver[DispatchEvent]: @@ -355,5 +430,6 @@ async def new_running_state_event_receiver( A new receiver for dispatches whose running status changed. """ return await self._bg_service.new_running_state_event_receiver( - dispatch_type, merge_strategy=merge_strategy + dispatch_type, + merge_strategy=merge_strategy, ) diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 5a8c454..8a0368d 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -473,12 +473,13 @@ async def test_dispatch_new_but_finished( new_dispatch, active=True, duration=timedelta(seconds=10), - start_time=_now() + timedelta(seconds=5), + start_time=_now() + timedelta(seconds=500), recurrence=RecurrenceRule(), type="TEST_TYPE", ) new_dispatch = await _test_new_dispatch_created(test_env, new_dispatch) + assert new_dispatch.started is False # Advance time to when the new dispatch should still not start fake_time.shift(timedelta(seconds=100)) diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 6d8bf92..c59d12b 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -5,7 +5,6 @@ import asyncio import heapq -import logging from dataclasses import dataclass, replace from datetime import datetime, timedelta, timezone from typing import AsyncIterator, Callable, Iterator, cast @@ -70,7 +69,6 @@ def __init__( ) -> None: """Initialize the actor.""" super().__init__(name="MockActor") - logging.info("MockActor created") self.initial_dispatch = initial_dispatch self.receiver = receiver @@ -78,6 +76,21 @@ async def _run(self) -> None: while True: await asyncio.sleep(1) + @classmethod + async def create( + cls, initial_dispatch: DispatchInfo, receiver: Receiver[DispatchInfo] + ) -> "MockActor": + """Create a new actor.""" + actor = cls(initial_dispatch, receiver) + return actor + + @classmethod + async def create_fail( + cls, __: DispatchInfo, _: Receiver[DispatchInfo] + ) -> "MockActor": + """Create a new actor.""" + raise ValueError("Failed to create actor") + @dataclass class TestEnv: @@ -101,7 +114,7 @@ async def test_env() -> AsyncIterator[TestEnv]: channel = Broadcast[Dispatch](name="dispatch ready test channel") actors_service = ActorDispatcher( - actor_factory=MockActor, + actor_factory=MockActor.create, running_status_receiver=channel.new_receiver(), dispatch_identity=lambda dispatch: dispatch.id, ) @@ -145,15 +158,12 @@ async def test_simple_start_stop( fake_time.shift(timedelta(seconds=1)) await asyncio.sleep(1) await asyncio.sleep(1) - logging.info("Sent dispatch") event = test_env.actor(1).initial_dispatch assert event.options == {"test": True} assert event.components == dispatch.target assert event.dry_run is False - logging.info("Received dispatch") - assert test_env.actor(1).is_running is True fake_time.shift(duration) @@ -167,6 +177,47 @@ async def test_simple_start_stop( # pylint: enable=protected-access +async def test_start_failed( + test_env: TestEnv, fake_time: time_machine.Coordinates +) -> None: + """Test auto-retry after 60 seconds.""" + # pylint: disable=protected-access + test_env.actors_service._actor_factory = MockActor.create_fail + + now = _now() + duration = timedelta(minutes=10) + dispatch = test_env.generator.generate_dispatch() + dispatch = replace( + dispatch, + id=1, + active=True, + dry_run=False, + duration=duration, + start_time=now, + payload={"test": True}, + type="UNIT_TEST", + recurrence=replace( + dispatch.recurrence, + frequency=Frequency.UNSPECIFIED, + ), + ) + + # Send status update to start actor, expect no DispatchInfo for the start + await test_env.running_status_sender.send(Dispatch(dispatch)) + fake_time.shift(timedelta(seconds=1)) + + # Replace failing mock actor factory with a working one + test_env.actors_service._actor_factory = MockActor.create + + # Give retry task time to start + await asyncio.sleep(1) + + fake_time.shift(timedelta(seconds=65)) + await asyncio.sleep(65) + + assert test_env.actor(1).is_running is True + + def test_heapq_dispatch_compare(test_env: TestEnv) -> None: """Test that the heapq compare function works.""" dispatch1 = test_env.generator.generate_dispatch() @@ -258,7 +309,7 @@ async def test_manage_abstraction( generator: DispatchGenerator, strategy: MergeStrategy | None, ) -> None: - """Test Dispatcher.start_dispatching sets up correctly.""" + """Test Dispatcher.start_managing sets up correctly.""" identity: Callable[[Dispatch], int] = ( strategy.identity if strategy else lambda dispatch: dispatch.id ) @@ -284,7 +335,10 @@ def __init__(self, *, server_url: str, key: str): sender = channel.new_sender() async def new_mock_receiver( - _: Dispatcher, dispatch_type: str, *, merge_strategy: MergeStrategy | None + _: Dispatcher, + dispatch_type: str, + *, + merge_strategy: MergeStrategy | None, ) -> Receiver[Dispatch]: assert dispatch_type == "MANAGE_TEST" assert merge_strategy is strategy @@ -294,16 +348,18 @@ async def new_mock_receiver( "frequenz.dispatch._dispatcher.Dispatcher.new_running_state_event_receiver", new_mock_receiver, ): - await dispatcher.start_dispatching( + await dispatcher.start_managing( dispatch_type="MANAGE_TEST", - actor_factory=MockActor, + actor_factory=MockActor.create, merge_strategy=strategy, ) # pylint: disable=protected-access assert "MANAGE_TEST" in dispatcher._actor_dispatchers actor_manager = dispatcher._actor_dispatchers["MANAGE_TEST"] - assert actor_manager._actor_factory == MockActor + # pylint: disable=comparison-with-callable + assert actor_manager._actor_factory == MockActor.create + # pylint: enable=comparison-with-callable dispatch = Dispatch( replace(