Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import logging
from asyncio import Event
from typing import Any, Callable, Generator
from typing import Callable

from frequenz.channels import Receiver
from frequenz.client.dispatch import Client
from frequenz.sdk.actor import Actor
from frequenz.sdk.actor import Actor, BackgroundService
from typing_extensions import override

from ._actor_dispatcher import ActorDispatcher, DispatchInfo
from ._bg_service import DispatchScheduler, MergeStrategy
Expand All @@ -22,7 +23,7 @@
_logger = logging.getLogger(__name__)


class Dispatcher:
class Dispatcher(BackgroundService):
"""A highlevel interface for the dispatch API.

This class provides a highlevel interface to the dispatch API.
Expand Down Expand Up @@ -183,6 +184,8 @@ def __init__(
server_url: The URL of the dispatch service.
key: The key to access the service.
"""
super().__init__(name="Dispatcher")

self._client = Client(server_url=server_url, key=key)
self._bg_service = DispatchScheduler(
microgrid_id,
Expand All @@ -192,10 +195,32 @@ def __init__(
self._empty_event = Event()
self._empty_event.set()

@override
def start(self) -> None:
"""Start the local dispatch service."""
self._bg_service.start()

@property
@override
def is_running(self) -> bool:
"""Whether the local dispatch service is running."""
return self._bg_service.is_running

@override
async def wait(self) -> None:
"""Wait until all actor dispatches are stopped."""
await self._empty_event.wait()

@override
def cancel(self, msg: str | None = None) -> None:
"""Stop the local dispatch service."""
self._bg_service.cancel(msg)

for instance in self._actor_dispatchers.values():
instance.cancel()

self._actor_dispatchers.clear()

async def start_dispatching(
self,
dispatch_type: str,
Expand Down Expand Up @@ -256,10 +281,6 @@ async def stop_dispatching(self, dispatch_type: str) -> None:
if not self._actor_dispatchers:
self._empty_event.set()

def __await__(self) -> Generator[Any, None, bool]:
"""Wait until all actor dispatches are stopped."""
return self._empty_event.wait().__await__()

@property
def client(self) -> Client:
"""Return the client."""
Expand Down
Loading