Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Two properties have been replaced by methods that require a type as parameter.
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, unify_running_intervals: bool)`.
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.

## New Features

Expand Down
6 changes: 4 additions & 2 deletions src/frequenz/dispatch/_bg_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,16 +259,18 @@ async def _fetch(self) -> None:
self._dispatches[dispatch.id] = Dispatch(client_dispatch)
old_dispatch = old_dispatches.pop(dispatch.id, None)
if not old_dispatch:
_logger.info("New dispatch: %s", dispatch)
_logger.debug("New dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(dispatch, None)
await self._lifecycle_events_tx.send(Created(dispatch=dispatch))
elif dispatch.update_time != old_dispatch.update_time:
_logger.info("Updated dispatch: %s", dispatch)
_logger.debug("Updated dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(
dispatch, old_dispatch
)
await self._lifecycle_events_tx.send(Updated(dispatch=dispatch))

_logger.info("Received %s dispatches", len(self._dispatches))

except grpc.aio.AioRpcError as error:
_logger.error("Error fetching dispatches: %s", error)
self._dispatches = old_dispatches
Expand Down
20 changes: 7 additions & 13 deletions src/frequenz/dispatch/_managing_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ async def _run(self) -> None:

def set_components(self, components: TargetComponents) -> None:
match components:
case [int(), *_] as component_ids:
print("Dispatch: Setting components to %s", components)
case []:
print("Dispatch: Using all components")
case list() as ids if isinstance(ids[0], int):
component_ids = ids
case [ComponentCategory.BATTERY, *_]:
print("Dispatch: Using all battery components")
component_category = ComponentCategory.BATTERY
case unsupported:
print(
"Dispatch: Requested an unsupported target component %r, "
"Dispatch: Requested an unsupported selector %r, "
"but only component IDs or category BATTERY are supported.",
unsupported,
)
Expand All @@ -91,11 +93,10 @@ async def run():
# Start actor and give it an dispatch updates channel receiver
my_actor = MyActor(dispatch_updates_channel.new_receiver())

status_receiver = dispatcher.running_status_change.new_receiver()
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wasn't this picked up by tests? 🤔


managing_actor = DispatchManagingActor(
actor=my_actor,
dispatch_type="EXAMPLE",
running_status_receiver=status_receiver,
updates_sender=dispatch_updates_channel.new_sender(),
)
Expand All @@ -107,15 +108,13 @@ async def run():
def __init__(
self,
actor: Actor | Set[Actor],
dispatch_type: str,
running_status_receiver: Receiver[Dispatch],
updates_sender: Sender[DispatchUpdate] | None = None,
) -> None:
"""Initialize the dispatch handler.

Args:
actor: A set of actors or a single actor to manage.
dispatch_type: The type of dispatches to handle.
running_status_receiver: The receiver for dispatch running status changes.
updates_sender: The sender for dispatch events
"""
Expand All @@ -124,7 +123,6 @@ def __init__(
self._actors: frozenset[Actor] = frozenset(
[actor] if isinstance(actor, Actor) else actor
)
self._dispatch_type = dispatch_type
self._updates_sender = updates_sender

def _start_actors(self) -> None:
Expand Down Expand Up @@ -158,10 +156,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
Args:
dispatch: The dispatch to handle.
"""
if dispatch.type != self._dispatch_type:
_logger.debug("Ignoring dispatch %s", dispatch.id)
return

if dispatch.started:
if self._updates_sender is not None:
_logger.info("Updated by dispatch %s", dispatch.id)
Expand Down
1 change: 0 additions & 1 deletion tests/test_mananging_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ async def test_env() -> AsyncIterator[TestEnv]:

runner_actor = DispatchManagingActor(
actor=actor,
dispatch_type="UNIT_TEST",
running_status_receiver=channel.new_receiver(),
updates_sender=updates_channel.new_sender(),
)
Expand Down
Loading