From 27252a8e5a048ec7f6547e9098310454536ae799 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 28 Jan 2025 19:39:24 +0100 Subject: [PATCH 01/17] ActorDispatcher: Support dispatches referring to different actors Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 27 ++++++++++------- tests/test_mananging_actor.py | 35 ++++++++-------------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 6262f3a..962c91e 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService): import os import asyncio from typing import override - from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo + from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo from frequenz.client.dispatch.types import TargetComponents from frequenz.client.common.microgrid.components import ComponentCategory from frequenz.channels import Receiver, Broadcast, select, selected_from @@ -125,9 +125,10 @@ async def main(): status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") - managing_actor = DispatchManagingActor( + managing_actor = ActorDispatcher( actor_factory=MyActor.new_with_dispatch, running_status_receiver=status_receiver, + map_dispatch=lambda dispatch: dispatch.id, ) await run(managing_actor) @@ -138,6 +139,7 @@ def __init__( self, actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], running_status_receiver: Receiver[Dispatch], + map_dispatch: Callable[[Dispatch], int], ) -> None: """Initialize the dispatch handler. @@ -145,11 +147,13 @@ def __init__( actor_factory: A callable that creates an actor with some initial dispatch information. running_status_receiver: The receiver for dispatch running status changes. + map_dispatch: A function to identify to which actor a dispatch refers. """ super().__init__() + self._map_dispatch = map_dispatch self._dispatch_rx = running_status_receiver self._actor_factory = actor_factory - self._actor: Actor | None = None + self._actors: dict[int, Actor] = {} self._updates_channel = Broadcast[DispatchInfo]( name="dispatch_updates_channel", resend_latest=True ) @@ -167,7 +171,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None: options=dispatch.payload, ) - if self._actor: + actor: Actor | None = self._actors.get(self._map_dispatch(dispatch)) + + if actor: sent_str = "" if self._updates_sender is not None: sent_str = ", sent a dispatch update instead of creating a new actor" @@ -179,10 +185,12 @@ async def _start_actor(self, dispatch: Dispatch) -> None: ) else: _logger.info("Starting actor for dispatch type %r", dispatch.type) - self._actor = self._actor_factory( + actor = self._actor_factory( dispatch_update, self._updates_channel.new_receiver() ) - self._actor.start() + self._actors[self._map_dispatch(dispatch)] = actor + + actor.start() async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """Stop all actors. @@ -191,13 +199,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: stopping_dispatch: The dispatch that is stopping the actor. msg: The message to be passed to the actors being stopped. """ - if self._actor is None: + if actor := self._actors.pop(self._map_dispatch(stopping_dispatch), None): + await actor.stop(msg) + else: _logger.warning( "Actor for dispatch type %r is not running", stopping_dispatch.type ) - else: - await self._actor.stop(msg) - self._actor = None async def _run(self) -> None: """Wait for dispatches and handle them.""" diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 6463558..07a2aab 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -69,21 +69,13 @@ class TestEnv: running_status_sender: Sender[Dispatch] generator: DispatchGenerator = DispatchGenerator() - @property - def actor(self) -> MockActor | None: + def actor(self, identity: int) -> MockActor: """Return the actor.""" # pylint: disable=protected-access - if self.actors_service._actor is None: - return None - return cast(MockActor, self.actors_service._actor) + assert identity in self.actors_service._actors + return cast(MockActor, self.actors_service._actors[identity]) # pylint: enable=protected-access - @property - def updates_receiver(self) -> Receiver[DispatchInfo]: - """Return the updates receiver.""" - assert self.actor is not None - return self.actor.receiver - @fixture async def test_env() -> AsyncIterator[TestEnv]: @@ -93,6 +85,7 @@ async def test_env() -> AsyncIterator[TestEnv]: actors_service = ActorDispatcher( actor_factory=MockActor, running_status_receiver=channel.new_receiver(), + map_dispatch=lambda dispatch: dispatch.id, ) actors_service.start() @@ -116,6 +109,7 @@ async def test_simple_start_stop( dispatch = test_env.generator.generate_dispatch() dispatch = replace( dispatch, + id=1, active=True, dry_run=False, duration=duration, @@ -135,16 +129,14 @@ async def test_simple_start_stop( await asyncio.sleep(1) logging.info("Sent dispatch") - assert test_env.actor is not None - event = test_env.actor.initial_dispatch + event = test_env.actor(1).initial_dispatch assert event.options == {"test": True} assert event.components == dispatch.target assert event.dry_run is False logging.info("Received dispatch") - assert test_env.actor is not None - assert test_env.actor.is_running is True + assert test_env.actor(1).is_running is True fake_time.shift(duration) await test_env.running_status_sender.send(Dispatch(dispatch)) @@ -152,7 +144,9 @@ async def test_simple_start_stop( # Give await actor.stop a chance to run await asyncio.sleep(1) - assert test_env.actor is None + # pylint: disable=protected-access + assert 1 not in test_env.actors_service._actors + # pylint: enable=protected-access def test_heapq_dispatch_compare(test_env: TestEnv) -> None: @@ -209,6 +203,7 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - dispatch = test_env.generator.generate_dispatch() dispatch = replace( dispatch, + id=1, dry_run=True, active=True, start_time=_now(), @@ -224,14 +219,12 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - fake_time.shift(timedelta(seconds=1)) await asyncio.sleep(1) - assert test_env.actor is not None - event = test_env.actor.initial_dispatch + event = test_env.actor(1).initial_dispatch assert event.dry_run is dispatch.dry_run assert event.components == dispatch.target assert event.options == dispatch.payload - assert test_env.actor is not None - assert test_env.actor.is_running is True + assert test_env.actor(1).is_running is True assert dispatch.duration is not None fake_time.shift(dispatch.duration) @@ -239,5 +232,3 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - # Give await actor.stop a chance to run await asyncio.sleep(1) - - assert test_env.actor is None From df2f8a1760e12c01822851a8385be78e08c6fa53 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 10 Feb 2025 17:35:44 +0100 Subject: [PATCH 02/17] Remove unused async keyword from Dispatcher.start Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 1 + src/frequenz/dispatch/_dispatcher.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 4fe06aa..4ccc8fe 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,6 +6,7 @@ This release introduces a more flexible and powerful mechanism for managing disp ## Upgrading +* `Dispatcher.start` is no longer `async`. Remove `await` when calling it. * Two properties have been replaced by methods that require a type as parameter. * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`. * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`. diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index f2eca19..a05974f 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -179,7 +179,7 @@ def __init__( self._client, ) - async def start(self) -> None: + def start(self) -> None: """Start the local dispatch service.""" self._bg_service.start() From 5e7bfb97f91087778d47c4df6b6a99c707e2d5f3 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 4 Feb 2025 19:45:43 +0100 Subject: [PATCH 03/17] Fix broken MergeByTypeTarge.identity function Pending: I still need to investigate why the existing tests didn't catch this. Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_merge_strategies.py | 2 +- tests/test_frequenz_dispatch.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py index 3753546..46d9d27 100644 --- a/src/frequenz/dispatch/_merge_strategies.py +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -64,4 +64,4 @@ class MergeByTypeTarget(MergeByType): @override def identity(self, dispatch: Dispatch) -> int: """Identity function for the merge criteria.""" - return hash((dispatch.type, dispatch.target)) + return hash((dispatch.type, tuple(dispatch.target))) diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 0145bee..a9f127d 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -24,6 +24,7 @@ Deleted, Dispatch, DispatchEvent, + MergeByIdentity, MergeByType, MergeByTypeTarget, MergeStrategy, @@ -678,7 +679,7 @@ async def test_multiple_dispatches_sequential_intervals_merge( async def test_at_least_one_running_filter( fake_time: time_machine.Coordinates, generator: DispatchGenerator, - merge_strategy: MergeStrategy, + merge_strategy: MergeByIdentity, ) -> None: """Test scenarios directly tied to the _at_least_one_running logic.""" microgrid_id = randint(1, 100) @@ -701,6 +702,8 @@ async def test_at_least_one_running_filter( recurrence=RecurrenceRule(), type="TEST_TYPE", ) + _ = merge_strategy.identity(Dispatch(dispatch)) + lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE") await client.create(**to_create_params(microgrid_id, dispatch)) await lifecycle.receive() From e4fea21d1157c7fd6c6caed84c394619579176f4 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 4 Feb 2025 19:48:28 +0100 Subject: [PATCH 04/17] Add `Dispatcher.manage` to manage dispatchable actors Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 3 +- src/frequenz/dispatch/_dispatcher.py | 50 ++++++++++++++ tests/test_mananging_actor.py | 100 ++++++++++++++++++++++++++- 3 files changed, 149 insertions(+), 4 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 4ccc8fe..c138efa 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -20,6 +20,5 @@ This release introduces a more flexible and powerful mechanism for managing disp ## New Features * A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. - * The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600. - +* A new function `Dispatcher.manage` was added to simplify dispatchable actor management initialization. diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index a05974f..f49b547 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -4,12 +4,20 @@ """A highlevel interface for the dispatch API.""" +import logging +from typing import Callable + from frequenz.channels import Receiver from frequenz.client.dispatch import Client +from frequenz.sdk.actor import Actor +from ._actor_dispatcher import ActorDispatcher, DispatchInfo from ._bg_service import DispatchScheduler, MergeStrategy from ._dispatch import Dispatch from ._event import DispatchEvent +from ._merge_strategies import MergeByIdentity + +_logger = logging.getLogger(__name__) class Dispatcher: @@ -178,11 +186,53 @@ def __init__( microgrid_id, self._client, ) + self._actor_dispatchers: dict[str, ActorDispatcher] = {} def start(self) -> None: """Start the local dispatch service.""" self._bg_service.start() + async def manage( + self, + dispatch_type: str, + *, + actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], + merge_strategy: MergeByIdentity | None = None, + ) -> None: + """Manage actors for a given dispatch type. + + Creates and manages an ActorDispatcher for the given type that will + start, stop and reconfigure actors based on received dispatches. + + Args: + dispatch_type: The type of the dispatch to manage. + actor_factory: The factory to create actors. + merge_strategy: The strategy to merge running intervals. + """ + dispatcher = self._actor_dispatchers.get(dispatch_type) + + if dispatcher is not None: + _logger.debug( + "Ignoring duplicate actor dispatcher request for %r", dispatch_type + ) + return + + def id_identity(dispatch: Dispatch) -> int: + return dispatch.id + + dispatcher = ActorDispatcher( + actor_factory=actor_factory, + running_status_receiver=await self.new_running_state_event_receiver( + dispatch_type, merge_strategy=merge_strategy + ), + map_dispatch=( + id_identity if merge_strategy is None else merge_strategy.identity + ), + ) + + self._actor_dispatchers[dispatch_type] = dispatcher + dispatcher.start() + @property def client(self) -> Client: """Return the client.""" diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 07a2aab..0d67c9f 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -8,20 +8,39 @@ import logging from dataclasses import dataclass, replace from datetime import datetime, timedelta, timezone -from typing import AsyncIterator, Iterator, cast +from typing import AsyncIterator, Callable, Iterator, cast +from unittest.mock import patch import async_solipsism +import pytest import time_machine from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.dispatch import recurrence from frequenz.client.dispatch.recurrence import Frequency +from frequenz.client.dispatch.test.client import FakeClient from frequenz.client.dispatch.test.generator import DispatchGenerator from frequenz.sdk.actor import Actor from pytest import fixture -from frequenz.dispatch import ActorDispatcher, Dispatch, DispatchInfo +from frequenz.dispatch import ( + ActorDispatcher, + Dispatch, + Dispatcher, + DispatchInfo, + MergeByIdentity, + MergeByType, + MergeByTypeTarget, + MergeStrategy, +) from frequenz.dispatch._bg_service import DispatchScheduler +@fixture +def generator() -> DispatchGenerator: + """Return a dispatch generator.""" + return DispatchGenerator() + + @fixture def event_loop_policy() -> async_solipsism.EventLoopPolicy: """Set the event loop policy to use async_solipsism.""" @@ -232,3 +251,80 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - # Give await actor.stop a chance to run await asyncio.sleep(1) + + +@pytest.mark.parametrize("strategy", [MergeByTypeTarget(), MergeByType(), None]) +async def test_manage_abstraction( + fake_time: time_machine.Coordinates, + generator: DispatchGenerator, + strategy: MergeByIdentity | None, +) -> None: + """Test Dispatcher.manage sets up correctly.""" + identity: Callable[[Dispatch], int] = ( + strategy.identity if strategy else lambda dispatch: dispatch.id + ) + + class MyFakeClient(FakeClient): + """Fake client for testing.""" + + def __init__(self, *, server_url: str, key: str): + assert server_url + assert key + super().__init__() + + mid = 1 + + # Patch `Client` class in Dispatcher with MyFakeClient + with patch("frequenz.dispatch._dispatcher.Client", MyFakeClient): + dispatcher = Dispatcher( + microgrid_id=mid, server_url="grpc://test-url", key="test-key" + ) + dispatcher.start() + + channel = Broadcast[Dispatch](name="dispatch ready test channel") + sender = channel.new_sender() + + async def new_mock_receiver( + _: Dispatcher, dispatch_type: str, *, merge_strategy: MergeStrategy | None + ) -> Receiver[Dispatch]: + assert dispatch_type == "MANAGE_TEST" + assert merge_strategy is strategy + return channel.new_receiver() + + with patch( + "frequenz.dispatch._dispatcher.Dispatcher.new_running_state_event_receiver", + new_mock_receiver, + ): + await dispatcher.manage( + dispatch_type="MANAGE_TEST", + actor_factory=MockActor, + merge_strategy=strategy, + ) + + # pylint: disable=protected-access + assert "MANAGE_TEST" in dispatcher._actor_dispatchers + actor_manager = dispatcher._actor_dispatchers["MANAGE_TEST"] + assert actor_manager._actor_factory == MockActor + + dispatch = Dispatch( + replace( + generator.generate_dispatch(), + start_time=_now(), + duration=timedelta(minutes=10), + recurrence=recurrence.RecurrenceRule(), + active=True, + type="MANAGE_TEST", + ) + ) + + fake_time.move_to(dispatch.start_time + timedelta(seconds=1)) + assert dispatch.started + + # Send a dispatch to start an actor instance + await sender.send(dispatch) + + # Give the actor a chance to start + await asyncio.sleep(1) + + # Check if actor instance is created + assert identity(dispatch) in actor_manager._actors From df88828e9cc4be0e46b591e8a5d979e39853203d Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 10 Feb 2025 15:27:48 +0100 Subject: [PATCH 05/17] Limit status update receiver queue to 1 and disable warning Signed-off-by: Mathias L. Baumann --- pyproject.toml | 2 +- src/frequenz/dispatch/_actor_dispatcher.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index babf892..b344d18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ # mkdocs.yml file when changing the version here (look for the config key # plugins.mkdocstrings.handlers.python.import) "frequenz-sdk >= 1.0.0-rc1302, < 1.0.0-rc1600", - "frequenz-channels >= 1.3.0, < 2.0.0", + "frequenz-channels >= 1.6.1, < 2.0.0", "frequenz-client-dispatch >= 0.8.4, < 0.9.0", ] dynamic = ["version"] diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 962c91e..b173d44 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -186,7 +186,8 @@ async def _start_actor(self, dispatch: Dispatch) -> None: else: _logger.info("Starting actor for dispatch type %r", dispatch.type) actor = self._actor_factory( - dispatch_update, self._updates_channel.new_receiver() + dispatch_update, + self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), ) self._actors[self._map_dispatch(dispatch)] = actor From 2c29ab71e966cb3280804bd00d271cfea9097400 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 10 Feb 2025 15:32:14 +0100 Subject: [PATCH 06/17] Rename: Use start/stop dispatching terminology Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatcher.py | 12 +++++++++++- tests/test_mananging_actor.py | 4 ++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index f49b547..bf7711a 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -192,7 +192,7 @@ def start(self) -> None: """Start the local dispatch service.""" self._bg_service.start() - async def manage( + async def start_dispatching( self, dispatch_type: str, *, @@ -233,6 +233,16 @@ def id_identity(dispatch: Dispatch) -> int: self._actor_dispatchers[dispatch_type] = dispatcher dispatcher.start() + async def stop_dispatching(self, dispatch_type: str) -> None: + """Stop managing actors for a given dispatch type. + + Args: + dispatch_type: The type of the dispatch to stop managing. + """ + dispatcher = self._actor_dispatchers.pop(dispatch_type, None) + if dispatcher is not None: + await dispatcher.stop() + @property def client(self) -> Client: """Return the client.""" diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 0d67c9f..f3a9121 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -259,7 +259,7 @@ async def test_manage_abstraction( generator: DispatchGenerator, strategy: MergeByIdentity | None, ) -> None: - """Test Dispatcher.manage sets up correctly.""" + """Test Dispatcher.start_dispatching sets up correctly.""" identity: Callable[[Dispatch], int] = ( strategy.identity if strategy else lambda dispatch: dispatch.id ) @@ -295,7 +295,7 @@ async def new_mock_receiver( "frequenz.dispatch._dispatcher.Dispatcher.new_running_state_event_receiver", new_mock_receiver, ): - await dispatcher.manage( + await dispatcher.start_dispatching( dispatch_type="MANAGE_TEST", actor_factory=MockActor, merge_strategy=strategy, From bd75c0fa9124b5567969b5bbee18f551e41331a8 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 10 Feb 2025 15:40:36 +0100 Subject: [PATCH 07/17] ActorDispatcher: Rename map_dispatch to dispatch_identity Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 15 ++++++++------- src/frequenz/dispatch/_dispatcher.py | 2 +- tests/test_mananging_actor.py | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index b173d44..9451e7b 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -128,7 +128,7 @@ async def main(): managing_actor = ActorDispatcher( actor_factory=MyActor.new_with_dispatch, running_status_receiver=status_receiver, - map_dispatch=lambda dispatch: dispatch.id, + dispatch_identity=lambda d: d.id, ) await run(managing_actor) @@ -139,7 +139,7 @@ def __init__( self, actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], running_status_receiver: Receiver[Dispatch], - map_dispatch: Callable[[Dispatch], int], + dispatch_identity: Callable[[Dispatch], int], ) -> None: """Initialize the dispatch handler. @@ -147,10 +147,11 @@ def __init__( actor_factory: A callable that creates an actor with some initial dispatch information. running_status_receiver: The receiver for dispatch running status changes. - map_dispatch: A function to identify to which actor a dispatch refers. + dispatch_identity: A function to identify to which actor a dispatch refers. + By default, it uses the dispatch ID. """ super().__init__() - self._map_dispatch = map_dispatch + self._dispatch_identity = dispatch_identity self._dispatch_rx = running_status_receiver self._actor_factory = actor_factory self._actors: dict[int, Actor] = {} @@ -171,7 +172,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None: options=dispatch.payload, ) - actor: Actor | None = self._actors.get(self._map_dispatch(dispatch)) + actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch)) if actor: sent_str = "" @@ -189,7 +190,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None: dispatch_update, self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), ) - self._actors[self._map_dispatch(dispatch)] = actor + self._actors[self._dispatch_identity(dispatch)] = actor actor.start() @@ -200,7 +201,7 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: stopping_dispatch: The dispatch that is stopping the actor. msg: The message to be passed to the actors being stopped. """ - if actor := self._actors.pop(self._map_dispatch(stopping_dispatch), None): + if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None): await actor.stop(msg) else: _logger.warning( diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index bf7711a..5236818 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -225,7 +225,7 @@ def id_identity(dispatch: Dispatch) -> int: running_status_receiver=await self.new_running_state_event_receiver( dispatch_type, merge_strategy=merge_strategy ), - map_dispatch=( + dispatch_identity=( id_identity if merge_strategy is None else merge_strategy.identity ), ) diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index f3a9121..0d5f107 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -104,7 +104,7 @@ async def test_env() -> AsyncIterator[TestEnv]: actors_service = ActorDispatcher( actor_factory=MockActor, running_status_receiver=channel.new_receiver(), - map_dispatch=lambda dispatch: dispatch.id, + dispatch_identity=lambda dispatch: dispatch.id, ) actors_service.start() From 07a077e5a7ff88869e576e527414afbc412b1bcc Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 10 Feb 2025 17:29:34 +0100 Subject: [PATCH 08/17] ActorDispatcher: Make `None` default identity (id) Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 9451e7b..48f6b7d 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -128,7 +128,6 @@ async def main(): managing_actor = ActorDispatcher( actor_factory=MyActor.new_with_dispatch, running_status_receiver=status_receiver, - dispatch_identity=lambda d: d.id, ) await run(managing_actor) @@ -139,7 +138,7 @@ def __init__( self, actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], running_status_receiver: Receiver[Dispatch], - dispatch_identity: Callable[[Dispatch], int], + dispatch_identity: Callable[[Dispatch], int] | None = None, ) -> None: """Initialize the dispatch handler. @@ -151,7 +150,10 @@ def __init__( By default, it uses the dispatch ID. """ super().__init__() - self._dispatch_identity = dispatch_identity + self._dispatch_identity: Callable[[Dispatch], int] = ( + dispatch_identity if dispatch_identity else lambda d: d.id + ) + self._dispatch_rx = running_status_receiver self._actor_factory = actor_factory self._actors: dict[int, Actor] = {} From 5ddb9034b62038a0fd58e67fa7d3ceac62c6cb08 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 11 Feb 2025 14:00:53 +0100 Subject: [PATCH 09/17] Update readme, make it ready for release Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c138efa..c39b1cd 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -9,16 +9,16 @@ This release introduces a more flexible and powerful mechanism for managing disp * `Dispatcher.start` is no longer `async`. Remove `await` when calling it. * Two properties have been replaced by methods that require a type as parameter. * `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`. - * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`. + * `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`. * The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function. * The `DispatchManagingActor` class has been renamed to `DispatchActorsService`. * It's interface has been simplified and now only requires an actor factory and a running status receiver. - * It only supports a single actor at a time now. + * It only starts/stops a single actor at a time now instead of a set of actors. * Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information. * `DispatchUpdate` was renamed to `DispatchInfo`. ## New Features -* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. +* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00. * The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600. -* A new function `Dispatcher.manage` was added to simplify dispatchable actor management initialization. +* Actor management with dispatches has been simplified. Calling `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` will begin managing your actor for the given type and merge strategy. All you need provide is an actor factory. To stop dispatching for your type, call `Dispatcher.stop_dispatching(dispatch_type)`. From 484c90bae659ca254e7c9f6980380586df6af1d1 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 11 Feb 2025 18:56:39 +0100 Subject: [PATCH 10/17] Make the dispatcher instance awaitable Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatcher.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 5236818..fa6dcb0 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -3,9 +3,11 @@ """A highlevel interface for the dispatch API.""" +from __future__ import annotations import logging -from typing import Callable +from asyncio import Event +from typing import Any, Callable, Generator from frequenz.channels import Receiver from frequenz.client.dispatch import Client @@ -187,6 +189,8 @@ def __init__( self._client, ) self._actor_dispatchers: dict[str, ActorDispatcher] = {} + self._empty_event = Event() + self._empty_event.set() def start(self) -> None: """Start the local dispatch service.""" @@ -204,6 +208,10 @@ async def start_dispatching( Creates and manages an ActorDispatcher for the given type that will start, stop and reconfigure actors based on received dispatches. + You can await the `Dispatcher` instance to block until all types + registered with `start_dispatching()` are stopped using + `stop_dispatching()` + Args: dispatch_type: The type of the dispatch to manage. actor_factory: The factory to create actors. @@ -217,6 +225,8 @@ async def start_dispatching( ) return + self._empty_event.clear() + def id_identity(dispatch: Dispatch) -> int: return dispatch.id @@ -243,6 +253,13 @@ async def stop_dispatching(self, dispatch_type: str) -> None: if dispatcher is not None: await dispatcher.stop() + if not self._actor_dispatchers: + self._empty_event.set() + + def __await__(self) -> Generator[Any, None, bool]: + """Wait until all actor dispatches are stopped.""" + return self._empty_event.wait().__await__() + @property def client(self) -> Client: """Return the client.""" From 94b8efe91b73a1ed9607c9429bc180f275e9d83e Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 11 Feb 2025 18:58:06 +0100 Subject: [PATCH 11/17] Catch & log exceptions on actor startup Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 25 +++++++++++++++------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 48f6b7d..be90a1b 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -187,14 +187,23 @@ async def _start_actor(self, dispatch: Dispatch) -> None: sent_str, ) else: - _logger.info("Starting actor for dispatch type %r", dispatch.type) - actor = self._actor_factory( - dispatch_update, - self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), - ) - self._actors[self._dispatch_identity(dispatch)] = actor - - actor.start() + try: + _logger.info("Starting actor for dispatch type %r", dispatch.type) + actor = self._actor_factory( + dispatch_update, + self._updates_channel.new_receiver(limit=1, warn_on_overflow=False), + ) + self._actors[self._dispatch_identity(dispatch)] = actor + + actor.start() + + except Exception as e: # pylint: disable=broad-except + _logger.error( + "Failed to start actor for dispatch type %r: %s", + dispatch.type, + e, + exc_info=True, + ) async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: """Stop all actors. From f3b414694cbb547436abac3065283a92786bb806 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 12 Feb 2025 19:00:26 +0100 Subject: [PATCH 12/17] Don't warn for changing dispatch parameters: It's a normal action Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index be90a1b..9ae2b6d 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -181,7 +181,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None: if self._updates_sender is not None: sent_str = ", sent a dispatch update instead of creating a new actor" await self._updates_sender.send(dispatch_update) - _logger.warning( + _logger.info( "Actor for dispatch type %r is already running%s", dispatch.type, sent_str, From e0c5d8d91cf6ff98358893094e7746556420c8bb Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 12 Feb 2025 19:00:56 +0100 Subject: [PATCH 13/17] Turn `Dispatcher` into a background service Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatcher.py | 35 ++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index fa6dcb0..7b0c1f0 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -7,11 +7,12 @@ import logging from asyncio import Event -from typing import Any, Callable, Generator +from typing import Callable from frequenz.channels import Receiver from frequenz.client.dispatch import Client -from frequenz.sdk.actor import Actor +from frequenz.sdk.actor import Actor, BackgroundService +from typing_extensions import override from ._actor_dispatcher import ActorDispatcher, DispatchInfo from ._bg_service import DispatchScheduler, MergeStrategy @@ -22,7 +23,7 @@ _logger = logging.getLogger(__name__) -class Dispatcher: +class Dispatcher(BackgroundService): """A highlevel interface for the dispatch API. This class provides a highlevel interface to the dispatch API. @@ -183,6 +184,8 @@ def __init__( server_url: The URL of the dispatch service. key: The key to access the service. """ + super().__init__(name="Dispatcher") + self._client = Client(server_url=server_url, key=key) self._bg_service = DispatchScheduler( microgrid_id, @@ -192,10 +195,32 @@ def __init__( self._empty_event = Event() self._empty_event.set() + @override def start(self) -> None: """Start the local dispatch service.""" self._bg_service.start() + @property + @override + def is_running(self) -> bool: + """Whether the local dispatch service is running.""" + return self._bg_service.is_running + + @override + async def wait(self) -> None: + """Wait until all actor dispatches are stopped.""" + await self._empty_event.wait() + + @override + def cancel(self, msg: str | None = None) -> None: + """Stop the local dispatch service.""" + self._bg_service.cancel(msg) + + for instance in self._actor_dispatchers.values(): + instance.cancel() + + self._actor_dispatchers.clear() + async def start_dispatching( self, dispatch_type: str, @@ -256,10 +281,6 @@ async def stop_dispatching(self, dispatch_type: str) -> None: if not self._actor_dispatchers: self._empty_event.set() - def __await__(self) -> Generator[Any, None, bool]: - """Wait until all actor dispatches are stopped.""" - return self._empty_event.wait().__await__() - @property def client(self) -> Client: """Return the client.""" From 23cc65faf2518606c7209191920b72daf5d3a55d Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 17 Feb 2025 17:15:02 +0100 Subject: [PATCH 14/17] Wait for bg services as well in `Dispatcher.wait` Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatcher.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 7b0c1f0..3e210e5 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -5,6 +5,7 @@ from __future__ import annotations +import asyncio import logging from asyncio import Event from typing import Callable @@ -209,7 +210,7 @@ def is_running(self) -> bool: @override async def wait(self) -> None: """Wait until all actor dispatches are stopped.""" - await self._empty_event.wait() + await asyncio.gather(self._bg_service.wait(), self._empty_event.wait()) @override def cancel(self, msg: str | None = None) -> None: From 9747b1553df4ffef9af5fcd3cd8221af4f2f818a Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 17 Feb 2025 17:26:35 +0100 Subject: [PATCH 15/17] Don't pass a name to Dispatcher background service Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 3e210e5..f560845 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -185,7 +185,7 @@ def __init__( server_url: The URL of the dispatch service. key: The key to access the service. """ - super().__init__(name="Dispatcher") + super().__init__() self._client = Client(server_url=server_url, key=key) self._bg_service = DispatchScheduler( From 7d43537aab75dce925477c3f8540ae792607c714 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 17 Feb 2025 17:26:50 +0100 Subject: [PATCH 16/17] Clear dispatches after waiting for them to stop Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index f560845..ac00bd0 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -212,6 +212,8 @@ async def wait(self) -> None: """Wait until all actor dispatches are stopped.""" await asyncio.gather(self._bg_service.wait(), self._empty_event.wait()) + self._actor_dispatchers.clear() + @override def cancel(self, msg: str | None = None) -> None: """Stop the local dispatch service.""" @@ -220,8 +222,6 @@ def cancel(self, msg: str | None = None) -> None: for instance in self._actor_dispatchers.values(): instance.cancel() - self._actor_dispatchers.clear() - async def start_dispatching( self, dispatch_type: str, From 6c34157ae86538b71ee7e8a1c7d61a0470750ad9 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 17 Feb 2025 17:48:09 +0100 Subject: [PATCH 17/17] Remove abstraction level `MergeByIdentity` and make it root instead Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/__init__.py | 3 +-- src/frequenz/dispatch/_bg_service.py | 13 +++++++------ src/frequenz/dispatch/_dispatcher.py | 3 +-- src/frequenz/dispatch/_merge_strategies.py | 17 ++++------------- tests/test_frequenz_dispatch.py | 3 +-- tests/test_mananging_actor.py | 3 +-- 6 files changed, 15 insertions(+), 27 deletions(-) diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index f239803..ac4bafb 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -20,7 +20,7 @@ from ._dispatch import Dispatch from ._dispatcher import Dispatcher from ._event import Created, Deleted, DispatchEvent, Updated -from ._merge_strategies import MergeByIdentity, MergeByType, MergeByTypeTarget +from ._merge_strategies import MergeByType, MergeByTypeTarget __all__ = [ "Created", @@ -32,7 +32,6 @@ "ActorDispatcher", "DispatchInfo", "MergeStrategy", - "MergeByIdentity", "MergeByType", "MergeByTypeTarget", ] diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index c54c204..f5a0a84 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -31,6 +31,10 @@ class MergeStrategy(ABC): """Base class for strategies to merge running intervals.""" + @abstractmethod + def identity(self, dispatch: Dispatch) -> int: + """Identity function for the merge criteria.""" + @abstractmethod def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool: """Filter dispatches based on the strategy. @@ -154,12 +158,9 @@ async def new_running_state_event_receiver( dispatches of the same type and target * `None` — no merging, just send all events - You can make your own strategy by subclassing: - - * [`MergeByIdentity`][frequenz.dispatch.MergeByIdentity] — Merges - dispatches based on a user defined identity function - * [`MergeStrategy`][frequenz.dispatch.MergeStrategy] — Merges based - on a user defined filter function + You can make your own identity-based strategy by subclassing `MergeByType` and overriding + the `identity()` method. If you require a more complex strategy, you can subclass + `MergeStrategy` directly and implement both the `identity()` and `filter()` methods. Running intervals from multiple dispatches will be merged, according to the chosen strategy. diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index ac00bd0..6397338 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -19,7 +19,6 @@ from ._bg_service import DispatchScheduler, MergeStrategy from ._dispatch import Dispatch from ._event import DispatchEvent -from ._merge_strategies import MergeByIdentity _logger = logging.getLogger(__name__) @@ -227,7 +226,7 @@ async def start_dispatching( dispatch_type: str, *, actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor], - merge_strategy: MergeByIdentity | None = None, + merge_strategy: MergeStrategy | None = None, ) -> None: """Manage actors for a given dispatch type. diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py index 46d9d27..e222b0a 100644 --- a/src/frequenz/dispatch/_merge_strategies.py +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -4,7 +4,6 @@ """Different merge strategies for dispatch running state events.""" import logging -from abc import abstractmethod from collections.abc import Mapping from typing_extensions import override @@ -13,12 +12,13 @@ from ._dispatch import Dispatch -class MergeByIdentity(MergeStrategy): - """Merge running intervals based on a dispatch configuration.""" +class MergeByType(MergeStrategy): + """Merge running intervals based on the dispatch type.""" - @abstractmethod + @override def identity(self, dispatch: Dispatch) -> int: """Identity function for the merge criteria.""" + return hash(dispatch.type) @override def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool: @@ -49,15 +49,6 @@ def filter(self, dispatches: Mapping[int, Dispatch], dispatch: Dispatch) -> bool return not other_dispatches_running -class MergeByType(MergeByIdentity): - """Merge running intervals based on the dispatch type.""" - - @override - def identity(self, dispatch: Dispatch) -> int: - """Identity function for the merge criteria.""" - return hash(dispatch.type) - - class MergeByTypeTarget(MergeByType): """Merge running intervals based on the dispatch type and target.""" diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index a9f127d..5a8c454 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -24,7 +24,6 @@ Deleted, Dispatch, DispatchEvent, - MergeByIdentity, MergeByType, MergeByTypeTarget, MergeStrategy, @@ -679,7 +678,7 @@ async def test_multiple_dispatches_sequential_intervals_merge( async def test_at_least_one_running_filter( fake_time: time_machine.Coordinates, generator: DispatchGenerator, - merge_strategy: MergeByIdentity, + merge_strategy: MergeStrategy, ) -> None: """Test scenarios directly tied to the _at_least_one_running logic.""" microgrid_id = randint(1, 100) diff --git a/tests/test_mananging_actor.py b/tests/test_mananging_actor.py index 0d5f107..6d8bf92 100644 --- a/tests/test_mananging_actor.py +++ b/tests/test_mananging_actor.py @@ -27,7 +27,6 @@ Dispatch, Dispatcher, DispatchInfo, - MergeByIdentity, MergeByType, MergeByTypeTarget, MergeStrategy, @@ -257,7 +256,7 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) - async def test_manage_abstraction( fake_time: time_machine.Coordinates, generator: DispatchGenerator, - strategy: MergeByIdentity | None, + strategy: MergeStrategy | None, ) -> None: """Test Dispatcher.start_dispatching sets up correctly.""" identity: Callable[[Dispatch], int] = (