Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This release introduces a more flexible and powerful mechanism for managing disp

## Upgrading

* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
* Two properties have been replaced by methods that require a type as parameter.
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`.
Expand All @@ -19,6 +20,5 @@ This release introduces a more flexible and powerful mechanism for managing disp
## New Features

* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.

* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.

* A new function `Dispatcher.manage` was added to simplify dispatchable actor management initialization.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies = [
# mkdocs.yml file when changing the version here (look for the config key
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk >= 1.0.0-rc1302, < 1.0.0-rc1600",
"frequenz-channels >= 1.3.0, < 2.0.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-client-dispatch >= 0.8.4, < 0.9.0",
]
dynamic = ["version"]
Expand Down
33 changes: 22 additions & 11 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService):
import os
import asyncio
from typing import override
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
from frequenz.client.dispatch.types import TargetComponents
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast, select, selected_from
Expand Down Expand Up @@ -125,7 +125,7 @@ async def main():

status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

managing_actor = DispatchManagingActor(
managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)
Expand All @@ -138,18 +138,25 @@ def __init__(
self,
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
running_status_receiver: Receiver[Dispatch],
dispatch_identity: Callable[[Dispatch], int] | None = None,
) -> None:
"""Initialize the dispatch handler.

Args:
actor_factory: A callable that creates an actor with some initial dispatch
information.
running_status_receiver: The receiver for dispatch running status changes.
dispatch_identity: A function to identify to which actor a dispatch refers.
By default, it uses the dispatch ID.
"""
super().__init__()
self._dispatch_identity: Callable[[Dispatch], int] = (
dispatch_identity if dispatch_identity else lambda d: d.id
)

self._dispatch_rx = running_status_receiver
self._actor_factory = actor_factory
self._actor: Actor | None = None
self._actors: dict[int, Actor] = {}
self._updates_channel = Broadcast[DispatchInfo](
name="dispatch_updates_channel", resend_latest=True
)
Expand All @@ -167,7 +174,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
options=dispatch.payload,
)

if self._actor:
actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))

if actor:
sent_str = ""
if self._updates_sender is not None:
sent_str = ", sent a dispatch update instead of creating a new actor"
Expand All @@ -179,10 +188,13 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
)
else:
_logger.info("Starting actor for dispatch type %r", dispatch.type)
self._actor = self._actor_factory(
dispatch_update, self._updates_channel.new_receiver()
actor = self._actor_factory(
dispatch_update,
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
)
self._actor.start()
self._actors[self._dispatch_identity(dispatch)] = actor

actor.start()

async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
"""Stop all actors.
Expand All @@ -191,13 +203,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
stopping_dispatch: The dispatch that is stopping the actor.
msg: The message to be passed to the actors being stopped.
"""
if self._actor is None:
if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None):
await actor.stop(msg)
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)
else:
await self._actor.stop(msg)
self._actor = None

async def _run(self) -> None:
"""Wait for dispatches and handle them."""
Expand Down
62 changes: 61 additions & 1 deletion src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@
"""A highlevel interface for the dispatch API."""


import logging
from typing import Callable

from frequenz.channels import Receiver
from frequenz.client.dispatch import Client
from frequenz.sdk.actor import Actor

from ._actor_dispatcher import ActorDispatcher, DispatchInfo
from ._bg_service import DispatchScheduler, MergeStrategy
from ._dispatch import Dispatch
from ._event import DispatchEvent
from ._merge_strategies import MergeByIdentity

_logger = logging.getLogger(__name__)


class Dispatcher:
Expand Down Expand Up @@ -178,11 +186,63 @@ def __init__(
microgrid_id,
self._client,
)
self._actor_dispatchers: dict[str, ActorDispatcher] = {}

async def start(self) -> None:
def start(self) -> None:
"""Start the local dispatch service."""
self._bg_service.start()

async def start_dispatching(
self,
dispatch_type: str,
*,
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
merge_strategy: MergeByIdentity | None = None,
) -> None:
"""Manage actors for a given dispatch type.

Creates and manages an ActorDispatcher for the given type that will
start, stop and reconfigure actors based on received dispatches.

Args:
dispatch_type: The type of the dispatch to manage.
actor_factory: The factory to create actors.
merge_strategy: The strategy to merge running intervals.
"""
dispatcher = self._actor_dispatchers.get(dispatch_type)

if dispatcher is not None:
_logger.debug(
"Ignoring duplicate actor dispatcher request for %r", dispatch_type
)
return

def id_identity(dispatch: Dispatch) -> int:
return dispatch.id

dispatcher = ActorDispatcher(
actor_factory=actor_factory,
running_status_receiver=await self.new_running_state_event_receiver(
dispatch_type, merge_strategy=merge_strategy
),
dispatch_identity=(
id_identity if merge_strategy is None else merge_strategy.identity
),
)

self._actor_dispatchers[dispatch_type] = dispatcher
dispatcher.start()

async def stop_dispatching(self, dispatch_type: str) -> None:
"""Stop managing actors for a given dispatch type.

Args:
dispatch_type: The type of the dispatch to stop managing.
"""
dispatcher = self._actor_dispatchers.pop(dispatch_type, None)
if dispatcher is not None:
await dispatcher.stop()

@property
def client(self) -> Client:
"""Return the client."""
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/dispatch/_merge_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ class MergeByTypeTarget(MergeByType):
@override
def identity(self, dispatch: Dispatch) -> int:
"""Identity function for the merge criteria."""
return hash((dispatch.type, dispatch.target))
return hash((dispatch.type, tuple(dispatch.target)))
5 changes: 4 additions & 1 deletion tests/test_frequenz_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Deleted,
Dispatch,
DispatchEvent,
MergeByIdentity,
MergeByType,
MergeByTypeTarget,
MergeStrategy,
Expand Down Expand Up @@ -678,7 +679,7 @@ async def test_multiple_dispatches_sequential_intervals_merge(
async def test_at_least_one_running_filter(
fake_time: time_machine.Coordinates,
generator: DispatchGenerator,
merge_strategy: MergeStrategy,
merge_strategy: MergeByIdentity,
) -> None:
"""Test scenarios directly tied to the _at_least_one_running logic."""
microgrid_id = randint(1, 100)
Expand All @@ -701,6 +702,8 @@ async def test_at_least_one_running_filter(
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
_ = merge_strategy.identity(Dispatch(dispatch))

lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE")
await client.create(**to_create_params(microgrid_id, dispatch))
await lifecycle.receive()
Expand Down
Loading
Loading