Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ This release introduces a more flexible and powerful mechanism for managing disp

## Upgrading

* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
* Two properties have been replaced by methods that require a type as parameter.
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`.
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
* It only supports a single actor at a time now.
* It only starts/stops a single actor at a time now instead of a set of actors.
* Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information.
* `DispatchUpdate` was renamed to `DispatchInfo`.

## New Features

* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.

* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.

* Actor management with dispatches has been simplified. Calling `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` will begin managing your actor for the given type and merge strategy. All you need provide is an actor factory. To stop dispatching for your type, call `Dispatcher.stop_dispatching(dispatch_type)`.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies = [
# mkdocs.yml file when changing the version here (look for the config key
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk >= 1.0.0-rc1302, < 1.0.0-rc1600",
"frequenz-channels >= 1.3.0, < 2.0.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-client-dispatch >= 0.8.4, < 0.9.0",
]
dynamic = ["version"]
Expand Down
46 changes: 33 additions & 13 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService):
import os
import asyncio
from typing import override
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
from frequenz.client.dispatch.types import TargetComponents
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast, select, selected_from
Expand Down Expand Up @@ -125,7 +125,7 @@ async def main():

status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

managing_actor = DispatchManagingActor(
managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)
Expand All @@ -138,18 +138,25 @@ def __init__(
self,
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
running_status_receiver: Receiver[Dispatch],
dispatch_identity: Callable[[Dispatch], int] | None = None,
) -> None:
"""Initialize the dispatch handler.

Args:
actor_factory: A callable that creates an actor with some initial dispatch
information.
running_status_receiver: The receiver for dispatch running status changes.
dispatch_identity: A function to identify to which actor a dispatch refers.
By default, it uses the dispatch ID.
"""
super().__init__()
self._dispatch_identity: Callable[[Dispatch], int] = (
dispatch_identity if dispatch_identity else lambda d: d.id
)

self._dispatch_rx = running_status_receiver
self._actor_factory = actor_factory
self._actor: Actor | None = None
self._actors: dict[int, Actor] = {}
self._updates_channel = Broadcast[DispatchInfo](
name="dispatch_updates_channel", resend_latest=True
)
Expand All @@ -167,7 +174,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
options=dispatch.payload,
)

if self._actor:
actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))

if actor:
sent_str = ""
if self._updates_sender is not None:
sent_str = ", sent a dispatch update instead of creating a new actor"
Expand All @@ -178,11 +187,23 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
sent_str,
)
else:
_logger.info("Starting actor for dispatch type %r", dispatch.type)
self._actor = self._actor_factory(
dispatch_update, self._updates_channel.new_receiver()
)
self._actor.start()
try:
_logger.info("Starting actor for dispatch type %r", dispatch.type)
actor = self._actor_factory(
dispatch_update,
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
)
self._actors[self._dispatch_identity(dispatch)] = actor

actor.start()

except Exception as e: # pylint: disable=broad-except
_logger.error(
"Failed to start actor for dispatch type %r: %s",
dispatch.type,
e,
exc_info=True,
)
Comment on lines +190 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, as it is a separate feature that can be implemented separately, but I think it might be good to do the starting in a background task and retrying forever like the edge app "dispatch subsystem" does (maybe with the new dispatcher we can even get rid of the dispatch subsystem).

Not 100% sure, as doing this is making maybe too many assumptions about what clients want, but maybe it could also be opt-in/out by passing an option like in_background: bool, retries: int | INFINITE).


async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
"""Stop all actors.
Expand All @@ -191,13 +212,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
stopping_dispatch: The dispatch that is stopping the actor.
msg: The message to be passed to the actors being stopped.
"""
if self._actor is None:
if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None):
await actor.stop(msg)
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)
else:
await self._actor.stop(msg)
self._actor = None

async def _run(self) -> None:
"""Wait for dispatches and handle them."""
Expand Down
79 changes: 78 additions & 1 deletion src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,23 @@

"""A highlevel interface for the dispatch API."""

from __future__ import annotations

import logging
from asyncio import Event
from typing import Any, Callable, Generator

from frequenz.channels import Receiver
from frequenz.client.dispatch import Client
from frequenz.sdk.actor import Actor

from ._actor_dispatcher import ActorDispatcher, DispatchInfo
from ._bg_service import DispatchScheduler, MergeStrategy
from ._dispatch import Dispatch
from ._event import DispatchEvent
from ._merge_strategies import MergeByIdentity

_logger = logging.getLogger(__name__)


class Dispatcher:
Expand Down Expand Up @@ -178,11 +188,78 @@ def __init__(
microgrid_id,
self._client,
)
self._actor_dispatchers: dict[str, ActorDispatcher] = {}
self._empty_event = Event()
self._empty_event.set()

async def start(self) -> None:
def start(self) -> None:
"""Start the local dispatch service."""
self._bg_service.start()

async def start_dispatching(
self,
dispatch_type: str,
*,
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
merge_strategy: MergeByIdentity | None = None,
) -> None:
"""Manage actors for a given dispatch type.

Creates and manages an ActorDispatcher for the given type that will
start, stop and reconfigure actors based on received dispatches.

You can await the `Dispatcher` instance to block until all types
registered with `start_dispatching()` are stopped using
`stop_dispatching()`

Args:
dispatch_type: The type of the dispatch to manage.
actor_factory: The factory to create actors.
merge_strategy: The strategy to merge running intervals.
"""
dispatcher = self._actor_dispatchers.get(dispatch_type)

if dispatcher is not None:
_logger.debug(
"Ignoring duplicate actor dispatcher request for %r", dispatch_type
)
return

self._empty_event.clear()

def id_identity(dispatch: Dispatch) -> int:
return dispatch.id

dispatcher = ActorDispatcher(
actor_factory=actor_factory,
running_status_receiver=await self.new_running_state_event_receiver(
dispatch_type, merge_strategy=merge_strategy
),
dispatch_identity=(
id_identity if merge_strategy is None else merge_strategy.identity
),
)

self._actor_dispatchers[dispatch_type] = dispatcher
dispatcher.start()

async def stop_dispatching(self, dispatch_type: str) -> None:
"""Stop managing actors for a given dispatch type.

Args:
dispatch_type: The type of the dispatch to stop managing.
"""
dispatcher = self._actor_dispatchers.pop(dispatch_type, None)
if dispatcher is not None:
await dispatcher.stop()

if not self._actor_dispatchers:
self._empty_event.set()

def __await__(self) -> Generator[Any, None, bool]:
"""Wait until all actor dispatches are stopped."""
return self._empty_event.wait().__await__()

@property
def client(self) -> Client:
"""Return the client."""
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/dispatch/_merge_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ class MergeByTypeTarget(MergeByType):
@override
def identity(self, dispatch: Dispatch) -> int:
"""Identity function for the merge criteria."""
return hash((dispatch.type, dispatch.target))
return hash((dispatch.type, tuple(dispatch.target)))
5 changes: 4 additions & 1 deletion tests/test_frequenz_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Deleted,
Dispatch,
DispatchEvent,
MergeByIdentity,
MergeByType,
MergeByTypeTarget,
MergeStrategy,
Expand Down Expand Up @@ -678,7 +679,7 @@ async def test_multiple_dispatches_sequential_intervals_merge(
async def test_at_least_one_running_filter(
fake_time: time_machine.Coordinates,
generator: DispatchGenerator,
merge_strategy: MergeStrategy,
merge_strategy: MergeByIdentity,
) -> None:
"""Test scenarios directly tied to the _at_least_one_running logic."""
microgrid_id = randint(1, 100)
Expand All @@ -701,6 +702,8 @@ async def test_at_least_one_running_filter(
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
_ = merge_strategy.identity(Dispatch(dispatch))

lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE")
await client.create(**to_create_params(microgrid_id, dispatch))
await lifecycle.receive()
Expand Down
Loading
Loading