Skip to content
48 changes: 17 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,33 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth

```python
import os
from frequenz.dispatch import Dispatcher
from unittest.mock import MagicMock
from datetime import timedelta

from frequenz.dispatch import Dispatcher, DispatchInfo, MergeByType

async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
return MagicMock(dispatch=dispatch, receiver=receiver)

async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")

microgrid_id = 1

dispatcher = Dispatcher(
async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
await dispatcher.start()

actor = MagicMock() # replace with your actor

changed_running_status_rx = dispatcher.new_running_state_event_receiver("MY_TYPE")

async for dispatch in changed_running_status_rx:
if dispatch.started:
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
if actor.is_running:
actor.reconfigure(
components=dispatch.target,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
) # this will reconfigure the actor
else:
# this will start a new actor with the given components
# and run it for the duration of the dispatch
actor.start(
components=dispatch.target,
run_parameters=dispatch.payload, # custom actor parameters
dry_run=dispatch.dry_run,
until=dispatch.until,
)
else:
actor.stop() # this will stop the actor
key=key,
) as dispatcher:
await dispatcher.start_managing(
dispatch_type="EXAMPLE_TYPE",
actor_factory=create_actor,
merge_strategy=MergeByType(),
retry_interval=timedelta(seconds=10)
)

await dispatcher
```

## Supported Platforms
Expand Down
49 changes: 47 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,50 @@ This release introduces a more flexible and powerful mechanism for managing disp

## Upgrading

A new simplified way to manage actors has been introduced:

Change your code from:
```python
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
dispatcher.start()

status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)

await run(managing_actor)
```

to

```python
async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
) as dispatcher:
await dispatcher.start_managing(
dispatch_type="EXAMPLE_TYPE",
actor_factory=MyActor.new_with_dispatch, # now async factory!
merge_strategy=MergeByType,
)
await dispatcher
```

Further changes:

* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
* Two properties have been replaced by methods that require a type as parameter.
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`.
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new_receiver function.
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
* It only starts/stops a single actor at a time now instead of a set of actors.
Expand All @@ -22,4 +61,10 @@ This release introduces a more flexible and powerful mechanism for managing disp

* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1800.
* Actor management with dispatches has been simplified. Calling `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` will begin managing your actor for the given type and merge strategy. All you need provide is an actor factory. To stop dispatching for your type, call `Dispatcher.stop_dispatching(dispatch_type)`.
* Actor management with dispatches has been simplified:
* `Dispatcher.start_managing(dispatch_type, actor_factory, merge_strategy, retry_interval)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
* `Dispatcher.stop_managing(dispatch_type)` to stop dispatching for the given type.
* `Dispatcher.is_managed(dispatch_type)` to check if dispatching is active for the given type.
* Dispatches that failed to start will now be retried after a delay.
* A new method `Dispatcher.wait_for_initialization()` has been added to wait for all actors to be initialized.
* When using `async with Dispatcher(..) as dispatcher`, the dispatcher will first wait for the dispatch service to be initialized before entering the block.
127 changes: 103 additions & 24 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from datetime import timedelta
from typing import Any, Awaitable

from frequenz.channels import Broadcast, Receiver
from frequenz.channels import Broadcast, Receiver, select
from frequenz.client.dispatch.types import TargetComponents
from frequenz.sdk.actor import Actor, BackgroundService

Expand Down Expand Up @@ -116,29 +117,77 @@ async def main():

microgrid_id = 1

dispatcher = Dispatcher(
async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
dispatcher.start()

status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
) as dispatcher:
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)
managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)

await run(managing_actor)
await run(managing_actor)
```
"""

def __init__(
class RetryFailedDispatches:
"""Manages the retry of failed dispatches."""

def __init__(self, retry_interval: timedelta) -> None:
"""Initialize the retry manager.

Args:
retry_interval: The interval between retries.
"""
self._retry_interval = retry_interval
self._channel = Broadcast[Dispatch](name="retry_channel")
self._sender = self._channel.new_sender()
self._tasks: set[asyncio.Task[None]] = set()

def new_receiver(self) -> Receiver[Dispatch]:
"""Create a new receiver for dispatches to retry.

Returns:
The receiver.
"""
return self._channel.new_receiver()

def retry(self, dispatch: Dispatch) -> None:
"""Retry a dispatch.

Args:
dispatch: The dispatch information to retry.
"""
task = asyncio.create_task(self._retry_after_delay(dispatch))
self._tasks.add(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you need to finalize this object too, otherwise when the dispatcher that instantiates it is destroyed, all these tasks will be dangling until the garbage collection destroys them. My feeling is this class needs to end up being a BackgroundService too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those tasks only hang around 60 seconds or so anyway then they destroy themselves through the done callback below..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is not too bad, but there could be situation on program termination that the loop can emit warnings when there are pending tasks and the loop is stopped, specially in tests. So I agree it is not urgent, but eventually I would implement proper finalization for this, as most of the time that we say "meh, who cares about finalization", it eventually comes back to bite us :D

task.add_done_callback(self._tasks.remove)

async def _retry_after_delay(self, dispatch: Dispatch) -> None:
"""Retry a dispatch after a delay.

Args:
dispatch: The dispatch information to retry.
"""
_logger.info(
"Will retry dispatch %s after %s",
dispatch.id,
self._retry_interval,
)
await asyncio.sleep(self._retry_interval.total_seconds())
_logger.info("Retrying dispatch %s now", dispatch.id)
await self._sender.send(dispatch)

def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
self,
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
actor_factory: Callable[
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
],
running_status_receiver: Receiver[Dispatch],
dispatch_identity: Callable[[Dispatch], int] | None = None,
retry_interval: timedelta | None = timedelta(seconds=60),
) -> None:
"""Initialize the dispatch handler.

Expand All @@ -148,6 +197,7 @@ def __init__(
running_status_receiver: The receiver for dispatch running status changes.
dispatch_identity: A function to identify to which actor a dispatch refers.
By default, it uses the dispatch ID.
retry_interval: The interval between retries. If `None`, retries are disabled.
"""
super().__init__()
self._dispatch_identity: Callable[[Dispatch], int] = (
Expand All @@ -161,6 +211,11 @@ def __init__(
name="dispatch_updates_channel", resend_latest=True
)
self._updates_sender = self._updates_channel.new_sender()
self._retrier = (
ActorDispatcher.RetryFailedDispatches(retry_interval)
if retry_interval
else None
)

def start(self) -> None:
"""Start the background service."""
Expand All @@ -174,7 +229,8 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
options=dispatch.payload,
)

actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))
identity = self._dispatch_identity(dispatch)
actor: Actor | None = self._actors.get(identity)

if actor:
sent_str = ""
Expand All @@ -189,21 +245,28 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
else:
try:
_logger.info("Starting actor for dispatch type %r", dispatch.type)
actor = self._actor_factory(
actor = await self._actor_factory(
dispatch_update,
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
)
self._actors[self._dispatch_identity(dispatch)] = actor

actor.start()

except Exception as e: # pylint: disable=broad-except
_logger.error(
"Failed to start actor for dispatch type %r: %s",
"Failed to start actor for dispatch type %r",
dispatch.type,
e,
exc_info=True,
exc_info=e,
Comment on lines 256 to +259
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use _logger.exception() instead of explicitly passing the exc_info.

)
if self._retrier:
self._retrier.retry(dispatch)
else:
_logger.error(
"No retry mechanism enabled, dispatch %r failed", dispatch
)
else:
# No exception occurred, so we can add the actor to the list
self._actors[identity] = actor

async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
"""Stop all actors.
Expand All @@ -212,17 +275,33 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
stopping_dispatch: The dispatch that is stopping the actor.
msg: The message to be passed to the actors being stopped.
"""
if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None):
actor: Actor | None = None
identity = self._dispatch_identity(stopping_dispatch)

actor = self._actors.get(identity)

if actor:
await actor.stop(msg)

del self._actors[identity]
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)
Comment on lines +281 to 290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip:

Suggested change
actor = self._actors.get(identity)
if actor:
await actor.stop(msg)
del self._actors[identity]
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)
if actor := self._actors.pop(identity, None):
await actor.stop(msg)
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)


async def _run(self) -> None:
"""Wait for dispatches and handle them."""
async for dispatch in self._dispatch_rx:
await self._handle_dispatch(dispatch=dispatch)
"""Run the background service."""
if not self._retrier:
async for dispatch in self._dispatch_rx:
await self._handle_dispatch(dispatch)
else:
retry_recv = self._retrier.new_receiver()

async for selected in select(retry_recv, self._dispatch_rx):
if retry_recv.triggered(selected):
self._retrier.retry(selected.message)
elif self._dispatch_rx.triggered(selected):
await self._handle_dispatch(selected.message)

async def _handle_dispatch(self, dispatch: Dispatch) -> None:
"""Handle a dispatch.
Expand Down
Loading
Loading