Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* A new method `Dispatcher.refresh()` was added, for when you need to refresh the dispatcher's state.

## Bug Fixes

Expand Down
4 changes: 4 additions & 0 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ async def start(self) -> None:
"""Start the actor."""
self._actor.start()

async def refresh(self) -> None:
"""Re-fetch all dispatches."""
await self._actor.fetch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good idea Design-wise. The idea is to only communicate with actors via channels once they are created. Maybe for this case is overkill, at least until we can really distribute actors in other processes/nodes, and if that is the case ok, but I would leave it as a private method and just add an ignore and a comment saying this is a hack.

Copy link
Contributor

@ela-kotulska-frequenz ela-kotulska-frequenz Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right thanks! My use case:
I have an actor that uses Dispacher.
When Dispacher starts (For the first time) it sends all dispaches and I receive them.

If my actor crashes and restarts latest dispaches are not send. And this is understandable because dispacher doesn't know that I restarted.
Another solution would be to create new dispacher on every restart. Would it be ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, as you commented in this review comment, I'm only referring to the call to the actor inside refresh(), adding refresh() itself is OK because Dispatcher is just a class that uses an actor underneath (and the class instance should always be local, only the actor might be in another process/node).

That said, yeah, I think if it crashes it should be OK to re-create, it shouldn't happen all the time anyway.

But I don't oppose to this addition, I think things could go wrong and having a way to force a refresh could still be useful in some cases. But maybe we should add some sort of Warning if refresh() is not really intended to be used in normal conditions, just to avoid confusion or people thinking they need to actively refresh the dispatcher instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the dispatcher should be instantiated by the app, not an actor, right? We should have only one dispatcher for the whole app/all actors, no?

Copy link
Contributor Author

@Marenz Marenz Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the dispatcher should be instantiated by the app, not an actor, right? We should have only one dispatcher for the whole app/all actors, no?

good point, yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will think and redesign my use case... Sorry


@property
def client(self) -> Client:
"""Return the client."""
Expand Down
88 changes: 51 additions & 37 deletions src/frequenz/dispatch/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""The logger for this module."""


# pylint: disable=too-many-instance-attributes
class DispatchingActor(Actor):
"""Dispatch actor.

Expand Down Expand Up @@ -84,12 +85,14 @@ def __init__(
always at index 0.
"""

self._currently_fetching = False

async def _run(self) -> None:
"""Run the actor."""
_logger.info("Starting dispatch actor for microgrid %s", self._microgrid_id)

# Initial fetch
await self._fetch()
await self.fetch()

stream = self._client.stream(microgrid_id=self._microgrid_id)

Expand Down Expand Up @@ -151,7 +154,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None:

self._update_timer()

async def _fetch(self) -> None:
async def fetch(self) -> None:
"""Fetch all relevant dispatches using list.

This is used for the initial fetch and for re-fetching all dispatches
Expand All @@ -160,43 +163,54 @@ async def _fetch(self) -> None:
old_dispatches = self._dispatches
self._dispatches = {}

try:
_logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
async for page in self._client.list(microgrid_id=self._microgrid_id):
for client_dispatch in page:
dispatch = Dispatch(client_dispatch)

self._dispatches[dispatch.id] = Dispatch(client_dispatch)
old_dispatch = old_dispatches.pop(dispatch.id, None)
if not old_dispatch:
_logger.info("New dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(dispatch, None)
await self._lifecycle_updates_sender.send(
Created(dispatch=dispatch)
)
elif dispatch.update_time != old_dispatch.update_time:
_logger.info("Updated dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(
dispatch, old_dispatch
)
await self._lifecycle_updates_sender.send(
Updated(dispatch=dispatch)
)

except grpc.aio.AioRpcError as error:
_logger.error("Error fetching dispatches: %s", error)
self._dispatches = old_dispatches
if self._currently_fetching:
_logger.debug("Already fetching dispatches, skipping")
return

for dispatch in old_dispatches.values():
_logger.info("Deleted dispatch: %s", dispatch)
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
await self._update_dispatch_schedule_and_notify(None, dispatch)

# Set deleted only here as it influences the result of dispatch.running()
# which is used in above in _running_state_change
dispatch._set_deleted() # pylint: disable=protected-access
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
try:
self._currently_fetching = True

try:
_logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
async for page in self._client.list(microgrid_id=self._microgrid_id):
for client_dispatch in page:
dispatch = Dispatch(client_dispatch)

self._dispatches[dispatch.id] = Dispatch(client_dispatch)
old_dispatch = old_dispatches.pop(dispatch.id, None)
if not old_dispatch:
_logger.info("New dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(
dispatch, None
)
await self._lifecycle_updates_sender.send(
Created(dispatch=dispatch)
)
elif dispatch.update_time != old_dispatch.update_time:
_logger.info("Updated dispatch: %s", dispatch)
await self._update_dispatch_schedule_and_notify(
dispatch, old_dispatch
)
await self._lifecycle_updates_sender.send(
Updated(dispatch=dispatch)
)

except grpc.aio.AioRpcError as error:
_logger.error("Error fetching dispatches: %s", error)
self._dispatches = old_dispatches
return

for dispatch in old_dispatches.values():
_logger.info("Deleted dispatch: %s", dispatch)
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
await self._update_dispatch_schedule_and_notify(None, dispatch)

# Set deleted only here as it influences the result of dispatch.running()
# which is used in above in _running_state_change
dispatch._set_deleted() # pylint: disable=protected-access
await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch))
finally:
self._currently_fetching = False

async def _update_dispatch_schedule_and_notify(
self, dispatch: Dispatch | None, old_dispatch: Dispatch | None
Expand Down
Loading