diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 4fe06aa..c39b1cd 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,19 +6,19 @@ This release introduces a more flexible and powerful mechanism for managing disp ## Upgrading +* `Dispatcher.start` is no longer `async`. Remove `await` when calling it. * Two properties have been replaced by methods that require a type as parameter. * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`. - * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`. + * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`. * The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function. * The `DispatchManagingActor` class has been renamed to `DispatchActorsService`. * It's interface has been simplified and now only requires an actor factory and a running status receiver. - * It only supports a single actor at a time now. + * It only starts/stops a single actor at a time now instead of a set of actors. * Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information. * `DispatchUpdate` was renamed to `DispatchInfo`. ## New Features -* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. - +* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. * The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600. - +* Actor management with dispatches has been simplified. Calling `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` will begin managing your actor for the given type and merge strategy. All you need provide is an actor factory. To stop dispatching for your type, call `Dispatcher.stop_dispatching(dispatch_type)`. diff --git a/pyproject.toml b/pyproject.toml index babf892..b344d18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ # mkdocs.yml file when changing the version here (look for the config key # plugins.mkdocstrings.handlers.python.import) "frequenz-sdk >= 1.0.0-rc1302, < 1.0.0-rc1600", - "frequenz-channels >= 1.3.0, < 2.0.0", + "frequenz-channels >= 1.6.1, < 2.0.0", "frequenz-client-dispatch >= 0.8.4, < 0.9.0", ] dynamic = ["version"] diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index f239803..ac4bafb 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -20,7 +20,7 @@ from ._dispatch import Dispatch from ._dispatcher import Dispatcher from ._event import Created, Deleted, DispatchEvent, Updated -from ._merge_strategies import MergeByIdentity, MergeByType, MergeByTypeTarget +from ._merge_strategies import MergeByType, MergeByTypeTarget __all__ = [ "Created", @@ -32,7 +32,6 @@ "ActorDispatcher", "DispatchInfo", "MergeStrategy", - "MergeByIdentity", "MergeByType", "MergeByTypeTarget", ] diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 6262f3a..9ae2b6d 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService): import os import asyncio from typing import override - from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo + from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo from frequenz.client.dispatch.types import TargetComponents from frequenz.client.common.microgrid.components import ComponentCategory from frequenz.channels import Receiver, Broadcast, select, selected_from @@ -125,7 +125,7 @@ async def main(): status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") - managing_actor = DispatchManagingActor( + managing_actor = ActorDispatcher( actor_factory=MyActor.new_with_dispatch, running_status_receiver=status_receiver, ) @@ -138,6 +138,7 @@ def __init__( self, actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], running_status_receiver: Receiver[Dispatch], + dispatch_identity: Callable[[Dispatch], int] | None = None, ) -> None: """Initialize the dispatch handler. @@ -145,11 +146,17 @@ def __init__( actor_factory: A callable that creates an actor with some initial dispatch information. running_status_receiver: The receiver for dispatch running status changes. + dispatch_identity: A function to identify to which actor a dispatch refers. + By default, it uses the dispatch ID. """ super().__init__() + self._dispatch_identity: Callable[[Dispatch], int] = ( + dispatch_identity if dispatch_identity else lambda d: d.id + ) + self._dispatch_rx = running_status_receiver self._actor_factory = actor_factory - self._actor: Actor | None = None + self._actors: dict[int, Actor] = {} self._updates_channel = Broadcast[DispatchInfo]( name="dispatch_updates_channel", resend_latest=True ) @@ -167,22 +174,36 @@ async def _start_actor(self, dispatch: Dispatch) -> None: options=dispatch.payload, ) - if self._actor: + actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch)) + + if actor: sent_str = "" if self._updates_sender is not None: sent_str = ", sent a dispatch update instead of creating a new actor" await self._updates_sender.send(dispatch_update) - _logger.warning( + _logger.info( "Actor for dispatch type %r is already running%s", dispatch.type, sent_str, ) else: - _logger.info("Starting actor for dispatch type %r", dispatch.type) - self._actor = self._actor_factory( - dispatch_update, self._updates_channel.new_receiver() - ) - self._actor.start() + try: + _logger.info("Starting actor for dispatch type %r", dispatch.type) + actor = self._actor_factory( + dispatch_update, + self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), + ) + self._actors[self._dispatch_identity(dispatch)] = actor + + actor.start() + + except Exception as e: # pylint: disable=broad-except + _logger.error( + "Failed to start actor for dispatch type %r: %s", + dispatch.type, + e, + exc_info=True, + ) async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """Stop all actors. @@ -191,13 +212,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: stopping_dispatch: The dispatch that is stopping the actor. msg: The message to be passed to the actors being stopped. """ - if self._actor is None: + if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None): + await actor.stop(msg) + else: _logger.warning( "Actor for dispatch type %r is not running", stopping_dispatch.type ) - else: - await self._actor.stop(msg) - self._actor = None async def _run(self) -> None: """Wait for dispatches and handle them.""" diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index c54c204..f5a0a84 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -31,6 +31,10 @@ class MergeStrategy(ABC): """Base class for strategies to merge running intervals.""" + @abstractmethod + def identity(self, dispatch: Dispatch) -> int: + """Identity function for the merge criteria.""" + @abstractmethod def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool: """Filter dispatches based on the strategy. @@ -154,12 +158,9 @@ async def new_running_state_event_receiver( dispatches of the same type and target * `None` — no merging, just send all events - You can make your own strategy by subclassing: - - * [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges - dispatches based on a user defined identity function - * [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based - on a user defined filter function + You can make your own identity-based strategy by subclassing `MergeByType` and overriding + the `identity()` method. If you require a more complex strategy, you can subclass + `MergeStrategy` directly and implement both the `identity()` and `filter()` methods. Running intervals from multiple dispatches will be merged, according to the chosen strategy. diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index f2eca19..6397338 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -3,16 +3,27 @@ """A highlevel interface for the dispatch API.""" +from __future__ import annotations + +import asyncio +import logging +from asyncio import Event +from typing import Callable from frequenz.channels import Receiver from frequenz.client.dispatch import Client +from frequenz.sdk.actor import Actor, BackgroundService +from typing_extensions import override +from ._actor_dispatcher import ActorDispatcher, DispatchInfo from ._bg_service import DispatchScheduler, MergeStrategy from ._dispatch import Dispatch from ._event import DispatchEvent +_logger = logging.getLogger(__name__) + -class Dispatcher: +class Dispatcher(BackgroundService): """A highlevel interface for the dispatch API. This class provides a highlevel interface to the dispatch API. @@ -173,16 +184,103 @@ def __init__( server_url: The URL of the dispatch service. key: The key to access the service. """ + super().__init__() + self._client = Client(server_url=server_url, key=key) self._bg_service = DispatchScheduler( microgrid_id, self._client, ) + self._actor_dispatchers: dict[str, ActorDispatcher] = {} + self._empty_event = Event() + self._empty_event.set() - async def start(self) -> None: + @override + def start(self) -> None: """Start the local dispatch service.""" self._bg_service.start() + @property + @override + def is_running(self) -> bool: + """Whether the local dispatch service is running.""" + return self._bg_service.is_running + + @override + async def wait(self) -> None: + """Wait until all actor dispatches are stopped.""" + await asyncio.gather(self._bg_service.wait(), self._empty_event.wait()) + + self._actor_dispatchers.clear() + + @override + def cancel(self, msg: str | None = None) -> None: + """Stop the local dispatch service.""" + self._bg_service.cancel(msg) + + for instance in self._actor_dispatchers.values(): + instance.cancel() + + async def start_dispatching( + self, + dispatch_type: str, + *, + actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], + merge_strategy: MergeStrategy | None = None, + ) -> None: + """Manage actors for a given dispatch type. + + Creates and manages an ActorDispatcher for the given type that will + start, stop and reconfigure actors based on received dispatches. + + You can await the `Dispatcher` instance to block until all types + registered with `start_dispatching()` are stopped using + `stop_dispatching()` + + Args: + dispatch_type: The type of the dispatch to manage. + actor_factory: The factory to create actors. + merge_strategy: The strategy to merge running intervals. + """ + dispatcher = self._actor_dispatchers.get(dispatch_type) + + if dispatcher is not None: + _logger.debug( + "Ignoring duplicate actor dispatcher request for %r", dispatch_type + ) + return + + self._empty_event.clear() + + def id_identity(dispatch: Dispatch) -> int: + return dispatch.id + + dispatcher = ActorDispatcher( + actor_factory=actor_factory, + running_status_receiver=await self.new_running_state_event_receiver( + dispatch_type, merge_strategy=merge_strategy + ), + dispatch_identity=( + id_identity if merge_strategy is None else merge_strategy.identity + ), + ) + + self._actor_dispatchers[dispatch_type] = dispatcher + dispatcher.start() + + async def stop_dispatching(self, dispatch_type: str) -> None: + """Stop managing actors for a given dispatch type. + + Args: + dispatch_type: The type of the dispatch to stop managing. + """ + dispatcher = self._actor_dispatchers.pop(dispatch_type, None) + if dispatcher is not None: + await dispatcher.stop() + + if not self._actor_dispatchers: + self._empty_event.set() + @property def client(self) -> Client: """Return the client.""" diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py index 3753546..e222b0a 100644 --- a/src/frequenz/dispatch/_merge_strategies.py +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -4,7 +4,6 @@ """Different merge strategies for dispatch running state events.""" import logging -from abc import abstractmethod from collections.abc import Mapping from typing_extensions import override @@ -13,12 +12,13 @@ from ._dispatch import Dispatch -class MergeByIdentity(MergeStrategy): - """Merge running intervals based on a dispatch configuration.""" +class MergeByType(MergeStrategy): + """Merge running intervals based on the dispatch type.""" - @abstractmethod + @override def identity(self, dispatch: Dispatch) -> int: """Identity function for the merge criteria.""" + return hash(dispatch.type) @override def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool: @@ -49,19 +49,10 @@ def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool return not other_dispatches_running -class MergeByType(MergeByIdentity): - """Merge running intervals based on the dispatch type.""" - - @override - def identity(self, dispatch: Dispatch) -> int: - """Identity function for the merge criteria.""" - return hash(dispatch.type) - - class MergeByTypeTarget(MergeByType): """Merge running intervals based on the dispatch type and target.""" @override def identity(self, dispatch: Dispatch) -> int: """Identity function for the merge criteria.""" - return hash((dispatch.type, dispatch.target)) + return hash((dispatch.type, tuple(dispatch.target))) diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 0145bee..5a8c454 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -701,6 +701,8 @@ async def test_at_least_one_running_filter( recurrence=RecurrenceRule(), type="TEST_TYPE", ) + _ = merge_strategy.identity(Dispatch(dispatch)) + lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE") await client.create(**to_create_params(microgrid_id, dispatch)) await lifecycle.receive() diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 6463558..6d8bf92 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -8,20 +8,38 @@ import logging from dataclasses import dataclass, replace from datetime import datetime, timedelta, timezone -from typing import AsyncIterator, Iterator, cast +from typing import AsyncIterator, Callable, Iterator, cast +from unittest.mock import patch import async_solipsism +import pytest import time_machine from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.dispatch import recurrence from frequenz.client.dispatch.recurrence import Frequency +from frequenz.client.dispatch.test.client import FakeClient from frequenz.client.dispatch.test.generator import DispatchGenerator from frequenz.sdk.actor import Actor from pytest import fixture -from frequenz.dispatch import ActorDispatcher, Dispatch, DispatchInfo +from frequenz.dispatch import ( + ActorDispatcher, + Dispatch, + Dispatcher, + DispatchInfo, + MergeByType, + MergeByTypeTarget, + MergeStrategy, +) from frequenz.dispatch._bg_service import DispatchScheduler +@fixture +def generator() -> DispatchGenerator: + """Return a dispatch generator.""" + return DispatchGenerator() + + @fixture def event_loop_policy() -> async_solipsism.EventLoopPolicy: """Set the event loop policy to use async_solipsism.""" @@ -69,21 +87,13 @@ class TestEnv: running_status_sender: Sender[Dispatch] generator: DispatchGenerator = DispatchGenerator() - @property - def actor(self) -> MockActor | None: + def actor(self, identity: int) -> MockActor: """Return the actor.""" # pylint: disable=protected-access - if self.actors_service._actor is None: - return None - return cast(MockActor, self.actors_service._actor) + assert identity in self.actors_service._actors + return cast(MockActor, self.actors_service._actors[identity]) # pylint: enable=protected-access - @property - def updates_receiver(self) -> Receiver[DispatchInfo]: - """Return the updates receiver.""" - assert self.actor is not None - return self.actor.receiver - @fixture async def test_env() -> AsyncIterator[TestEnv]: @@ -93,6 +103,7 @@ async def test_env() -> AsyncIterator[TestEnv]: actors_service = ActorDispatcher( actor_factory=MockActor, running_status_receiver=channel.new_receiver(), + dispatch_identity=lambda dispatch: dispatch.id, ) actors_service.start() @@ -116,6 +127,7 @@ async def test_simple_start_stop( dispatch = test_env.generator.generate_dispatch() dispatch = replace( dispatch, + id=1, active=True, dry_run=False, duration=duration, @@ -135,16 +147,14 @@ async def test_simple_start_stop( await asyncio.sleep(1) logging.info("Sent dispatch") - assert test_env.actor is not None - event = test_env.actor.initial_dispatch + event = test_env.actor(1).initial_dispatch assert event.options == {"test": True} assert event.components == dispatch.target assert event.dry_run is False logging.info("Received dispatch") - assert test_env.actor is not None - assert test_env.actor.is_running is True + assert test_env.actor(1).is_running is True fake_time.shift(duration) await test_env.running_status_sender.send(Dispatch(dispatch)) @@ -152,7 +162,9 @@ async def test_simple_start_stop( # Give await actor.stop a chance to run await asyncio.sleep(1) - assert test_env.actor is None + # pylint: disable=protected-access + assert 1 not in test_env.actors_service._actors + # pylint: enable=protected-access def test_heapq_dispatch_compare(test_env: TestEnv) -> None: @@ -209,6 +221,7 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - dispatch = test_env.generator.generate_dispatch() dispatch = replace( dispatch, + id=1, dry_run=True, active=True, start_time=_now(), @@ -224,14 +237,12 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - fake_time.shift(timedelta(seconds=1)) await asyncio.sleep(1) - assert test_env.actor is not None - event = test_env.actor.initial_dispatch + event = test_env.actor(1).initial_dispatch assert event.dry_run is dispatch.dry_run assert event.components == dispatch.target assert event.options == dispatch.payload - assert test_env.actor is not None - assert test_env.actor.is_running is True + assert test_env.actor(1).is_running is True assert dispatch.duration is not None fake_time.shift(dispatch.duration) @@ -240,4 +251,79 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - # Give await actor.stop a chance to run await asyncio.sleep(1) - assert test_env.actor is None + +@pytest.mark.parametrize("strategy", [MergeByTypeTarget(), MergeByType(), None]) +async def test_manage_abstraction( + fake_time: time_machine.Coordinates, + generator: DispatchGenerator, + strategy: MergeStrategy | None, +) -> None: + """Test Dispatcher.start_dispatching sets up correctly.""" + identity: Callable[[Dispatch], int] = ( + strategy.identity if strategy else lambda dispatch: dispatch.id + ) + + class MyFakeClient(FakeClient): + """Fake client for testing.""" + + def __init__(self, *, server_url: str, key: str): + assert server_url + assert key + super().__init__() + + mid = 1 + + # Patch `Client` class in Dispatcher with MyFakeClient + with patch("frequenz.dispatch._dispatcher.Client", MyFakeClient): + dispatcher = Dispatcher( + microgrid_id=mid, server_url="grpc://test-url", key="test-key" + ) + dispatcher.start() + + channel = Broadcast[Dispatch](name="dispatch ready test channel") + sender = channel.new_sender() + + async def new_mock_receiver( + _: Dispatcher, dispatch_type: str, *, merge_strategy: MergeStrategy | None + ) -> Receiver[Dispatch]: + assert dispatch_type == "MANAGE_TEST" + assert merge_strategy is strategy + return channel.new_receiver() + + with patch( + "frequenz.dispatch._dispatcher.Dispatcher.new_running_state_event_receiver", + new_mock_receiver, + ): + await dispatcher.start_dispatching( + dispatch_type="MANAGE_TEST", + actor_factory=MockActor, + merge_strategy=strategy, + ) + + # pylint: disable=protected-access + assert "MANAGE_TEST" in dispatcher._actor_dispatchers + actor_manager = dispatcher._actor_dispatchers["MANAGE_TEST"] + assert actor_manager._actor_factory == MockActor + + dispatch = Dispatch( + replace( + generator.generate_dispatch(), + start_time=_now(), + duration=timedelta(minutes=10), + recurrence=recurrence.RecurrenceRule(), + active=True, + type="MANAGE_TEST", + ) + ) + + fake_time.move_to(dispatch.start_time + timedelta(seconds=1)) + assert dispatch.started + + # Send a dispatch to start an actor instance + await sender.send(dispatch) + + # Give the actor a chance to start + await asyncio.sleep(1) + + # Check if actor instance is created + assert identity(dispatch) in actor_manager._actors