Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ This release introduces a more flexible and powerful mechanism for managing disp

## Upgrading

* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
* Two properties have been replaced by methods that require a type as parameter.
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`.
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
* It only supports a single actor at a time now.
* It only starts/stops a single actor at a time now instead of a set of actors.
* Refer to the updated [usage example](https://frequenz-floss.github.io/frequenz-dispatch-python/latest/reference/frequenz/dispatch/#frequenz.dispatch.DispatchActorsService) for more information.
* `DispatchUpdate` was renamed to `DispatchInfo`.

## New Features

* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.

* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.

* Actor management with dispatches has been simplified. Calling `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` will begin managing your actor for the given type and merge strategy. All you need provide is an actor factory. To stop dispatching for your type, call `Dispatcher.stop_dispatching(dispatch_type)`.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies = [
# mkdocs.yml file when changing the version here (look for the config key
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk >= 1.0.0-rc1302, < 1.0.0-rc1600",
"frequenz-channels >= 1.3.0, < 2.0.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-client-dispatch >= 0.8.4, < 0.9.0",
]
dynamic = ["version"]
Expand Down
48 changes: 34 additions & 14 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService):
import os
import asyncio
from typing import override
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
from frequenz.client.dispatch.types import TargetComponents
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast, select, selected_from
Expand Down Expand Up @@ -125,7 +125,7 @@ async def main():

status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

managing_actor = DispatchManagingActor(
managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)
Expand All @@ -138,18 +138,25 @@ def __init__(
self,
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
running_status_receiver: Receiver[Dispatch],
dispatch_identity: Callable[[Dispatch], int] | None = None,
) -> None:
"""Initialize the dispatch handler.

Args:
actor_factory: A callable that creates an actor with some initial dispatch
information.
running_status_receiver: The receiver for dispatch running status changes.
dispatch_identity: A function to identify to which actor a dispatch refers.
By default, it uses the dispatch ID.
"""
super().__init__()
self._dispatch_identity: Callable[[Dispatch], int] = (
dispatch_identity if dispatch_identity else lambda d: d.id
)

self._dispatch_rx = running_status_receiver
self._actor_factory = actor_factory
self._actor: Actor | None = None
self._actors: dict[int, Actor] = {}
self._updates_channel = Broadcast[DispatchInfo](
name="dispatch_updates_channel", resend_latest=True
)
Expand All @@ -167,22 +174,36 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
options=dispatch.payload,
)

if self._actor:
actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))

if actor:
sent_str = ""
if self._updates_sender is not None:
sent_str = ", sent a dispatch update instead of creating a new actor"
await self._updates_sender.send(dispatch_update)
_logger.warning(
_logger.info(
Comment on lines -175 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get this change, the commit message says " Don't warn for changing dispatch parameters: It's a normal action ", does this mean that _start_actor is called every time a dispatch is updated? If so, I would even make this log a debug or remove it completely, because it seems misleading, as it was never the intention to start the actor if it was just a dispatch update.

If there is a way to tell if this is called because a dispatch just started or was updated, then maybe we can log more meaningful messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, there is a way, but it's not available to the actor_dispatcher (without restructuring).
The idea is that the dispatch instance tells us the desired state, no matter the previous state, so from that perspectives it doesn't matter whether it was an update or a new dispatch, both cases should do both, start or update a running actor instance..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this a debug then, because it is completely normal and probably not very useful to get that info when things are running. But not hung to block this PR on.

"Actor for dispatch type %r is already running%s",
dispatch.type,
sent_str,
)
else:
_logger.info("Starting actor for dispatch type %r", dispatch.type)
self._actor = self._actor_factory(
dispatch_update, self._updates_channel.new_receiver()
)
self._actor.start()
try:
_logger.info("Starting actor for dispatch type %r", dispatch.type)
actor = self._actor_factory(
dispatch_update,
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
)
self._actors[self._dispatch_identity(dispatch)] = actor

actor.start()

except Exception as e: # pylint: disable=broad-except
_logger.error(
"Failed to start actor for dispatch type %r: %s",
dispatch.type,
e,
exc_info=True,
)
Comment on lines +190 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, as it is a separate feature that can be implemented separately, but I think it might be good to do the starting in a background task and retrying forever like the edge app "dispatch subsystem" does (maybe with the new dispatcher we can even get rid of the dispatch subsystem).

Not 100% sure, as doing this is making maybe too many assumptions about what clients want, but maybe it could also be opt-in/out by passing an option like in_background: bool, retries: int | INFINITE).


async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
"""Stop all actors.
Expand All @@ -191,13 +212,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
stopping_dispatch: The dispatch that is stopping the actor.
msg: The message to be passed to the actors being stopped.
"""
if self._actor is None:
if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None):
await actor.stop(msg)
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)
else:
await self._actor.stop(msg)
self._actor = None

async def _run(self) -> None:
"""Wait for dispatches and handle them."""
Expand Down
102 changes: 100 additions & 2 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,27 @@

"""A highlevel interface for the dispatch API."""

from __future__ import annotations

import logging
from asyncio import Event
from typing import Callable

from frequenz.channels import Receiver
from frequenz.client.dispatch import Client
from frequenz.sdk.actor import Actor, BackgroundService
from typing_extensions import override

from ._actor_dispatcher import ActorDispatcher, DispatchInfo
from ._bg_service import DispatchScheduler, MergeStrategy
from ._dispatch import Dispatch
from ._event import DispatchEvent
from ._merge_strategies import MergeByIdentity

_logger = logging.getLogger(__name__)


class Dispatcher:
class Dispatcher(BackgroundService):
"""A highlevel interface for the dispatch API.

This class provides a highlevel interface to the dispatch API.
Expand Down Expand Up @@ -173,16 +184,103 @@ def __init__(
server_url: The URL of the dispatch service.
key: The key to access the service.
"""
super().__init__(name="Dispatcher")

self._client = Client(server_url=server_url, key=key)
self._bg_service = DispatchScheduler(
microgrid_id,
self._client,
)
self._actor_dispatchers: dict[str, ActorDispatcher] = {}
self._empty_event = Event()
self._empty_event.set()

async def start(self) -> None:
@override
def start(self) -> None:
"""Start the local dispatch service."""
self._bg_service.start()

@property
@override
def is_running(self) -> bool:
"""Whether the local dispatch service is running."""
return self._bg_service.is_running

@override
async def wait(self) -> None:
"""Wait until all actor dispatches are stopped."""
await self._empty_event.wait()

@override
def cancel(self, msg: str | None = None) -> None:
"""Stop the local dispatch service."""
self._bg_service.cancel(msg)

for instance in self._actor_dispatchers.values():
instance.cancel()

self._actor_dispatchers.clear()

async def start_dispatching(
self,
dispatch_type: str,
*,
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
merge_strategy: MergeByIdentity | None = None,
) -> None:
"""Manage actors for a given dispatch type.

Creates and manages an ActorDispatcher for the given type that will
start, stop and reconfigure actors based on received dispatches.

You can await the `Dispatcher` instance to block until all types
registered with `start_dispatching()` are stopped using
`stop_dispatching()`

Args:
dispatch_type: The type of the dispatch to manage.
actor_factory: The factory to create actors.
merge_strategy: The strategy to merge running intervals.
"""
dispatcher = self._actor_dispatchers.get(dispatch_type)

if dispatcher is not None:
_logger.debug(
"Ignoring duplicate actor dispatcher request for %r", dispatch_type
)
return

self._empty_event.clear()

def id_identity(dispatch: Dispatch) -> int:
return dispatch.id

dispatcher = ActorDispatcher(
actor_factory=actor_factory,
running_status_receiver=await self.new_running_state_event_receiver(
dispatch_type, merge_strategy=merge_strategy
),
dispatch_identity=(
id_identity if merge_strategy is None else merge_strategy.identity
),
)

self._actor_dispatchers[dispatch_type] = dispatcher
dispatcher.start()

async def stop_dispatching(self, dispatch_type: str) -> None:
"""Stop managing actors for a given dispatch type.

Args:
dispatch_type: The type of the dispatch to stop managing.
"""
dispatcher = self._actor_dispatchers.pop(dispatch_type, None)
if dispatcher is not None:
await dispatcher.stop()

if not self._actor_dispatchers:
self._empty_event.set()

@property
def client(self) -> Client:
"""Return the client."""
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/dispatch/_merge_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ class MergeByTypeTarget(MergeByType):
@override
def identity(self, dispatch: Dispatch) -> int:
"""Identity function for the merge criteria."""
return hash((dispatch.type, dispatch.target))
return hash((dispatch.type, tuple(dispatch.target)))
5 changes: 4 additions & 1 deletion tests/test_frequenz_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Deleted,
Dispatch,
DispatchEvent,
MergeByIdentity,
MergeByType,
MergeByTypeTarget,
MergeStrategy,
Expand Down Expand Up @@ -678,7 +679,7 @@ async def test_multiple_dispatches_sequential_intervals_merge(
async def test_at_least_one_running_filter(
fake_time: time_machine.Coordinates,
generator: DispatchGenerator,
merge_strategy: MergeStrategy,
merge_strategy: MergeByIdentity,
) -> None:
"""Test scenarios directly tied to the _at_least_one_running logic."""
microgrid_id = randint(1, 100)
Expand All @@ -701,6 +702,8 @@ async def test_at_least_one_running_filter(
recurrence=RecurrenceRule(),
type="TEST_TYPE",
)
_ = merge_strategy.identity(Dispatch(dispatch))

lifecycle = service.new_lifecycle_events_receiver("TEST_TYPE")
await client.create(**to_create_params(microgrid_id, dispatch))
await lifecycle.receive()
Expand Down
Loading
Loading