Skip to content

Conversation

@Marenz
Copy link
Contributor

@Marenz Marenz commented Feb 18, 2025

  • Add method to check if a type is currently being dispatched
  • Make actor_factory function async for more flexibility
  • Add parameter autostart to Dispatcher() service
  • Add retry delay and auto restart to ActorService
  • Update Docs, Readme, Release Notes

@Marenz Marenz requested a review from a team as a code owner February 18, 2025 14:24
@github-actions github-actions bot added part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:dispatcher Affects the high-level dispatcher interface labels Feb 18, 2025
@Marenz Marenz requested a review from llucax February 18, 2025 17:01
Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in general but I would remove the autostart and change examples to use the dispatcher as a context manager. Also IMHO it needs clarification of how the retry interval works.

I would also really think about the implications of (re)creating the actor each time a dispatch arrives, just to make sure we are not missing anything important, but maybe we can delay this even for after the release, I guess it won't be such a huge impact if we change it in the short term.

exc_info=True,
exc_info=e,
)
self._start_failed_dispatches[identity] = dispatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so this is another difference with how are we managing things in the app, since actors are created and removed each time a dispatch comes instead of preserving the instance and just start/stop the actor with dispatches.

I'm still not sure which approach is best, and am starting to think maybe it is worth thinking about this a bit before deciding.

@Marenz
Copy link
Contributor Author

Marenz commented Feb 24, 2025

  • Removed autostart
  • Made examples use async with
  • Added "wait for initial dispatch with timeout" feature
  • throw when trying to get a receiver and service is not running
  • Added optional persistent actor setting/parameter for actor dispatch management
  • Added a proper retry system

RELEASE_NOTES.md Outdated
* Actor management with dispatches has been simplified:
* `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
* `Dispatcher.stop_dispatching(dispatch_type)` to stop dispatching for the given type.
* `Dispatcher.is_dispatching(dispatch_type)` to check if dispatching is active for the given type.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO need to rename this to is_managed

Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I'm less happy with this iteration than with the previous one.

  • I don't get very well why the new initial_fetch_timeout is needed. Can it normally happen that it takes too long? Is it a last resort thing? I'm not sure why this has to be done at this level, and it isn't the user that should wrap the initialization in a timeout() if they don't want to wait for too long. Wouldn't it make more sense to provide a method like await dispatcher.initialization_finished()?

  • I find the retry logic extremely convoluted. Why can't we just have one task with a loop like the in the edge app? Right now to try to figure out how the retry works I need to look into like 3 different places, the retry logic is very spread out.

  • I don't think it is a good idea to have 2 modes now, the persistent and non-persistent mode, it also adds a lot of complexity for something we don't know we'll need yet. It was just a comment for something to look into in the future.

  • Nitpick: commit message style: "Dispatches that failed to start will now be retried after a delay." ends with ., is not in imperative (like "Retry dispatches that failed to start after a delay"). Also what does it mean that a "dispatch failed to start". I guess is the actor that failed to start? But it is not entirely clear to me. Sometimes a one-line title is not enough. Also the commit in the previous point could answer all the questions I have there.

I really find this iteration much harder to read and to follow, and I don't see we are gaining much from the added complexity. 😟 And for me it was practically ready to be merged. Reading my comments maybe it wasn't obvious that I felt like this, so sorry about that.

I think in the future we should try to merge more quickly and iterate over the merged stuff instead, otherwise it can get a bit tiring to iterate over and over the same PR but with the code changing radically.

_logger.error(
"Failed to start actor for dispatch type %r: %s",
"Failed to start actor for dispatch type %r",
dispatch.type,
e,
exc_info=True,
exc_info=e,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just use _logger.exception() instead of explicitly passing the exc_info.

@Marenz
Copy link
Contributor Author

Marenz commented Feb 27, 2025

I don't get very well why the new initial_fetch_timeout is needed. Can it normally happen that it takes too long? Is it a last resort thing? I'm not sure why this has to be done at this level, and it isn't the user that should wrap the initialization in a timeout() if they don't want to wait for too long. Wouldn't it make more sense to provide a method like await dispatcher.initialization_finished()?

That part was added after I started using the with() thing and noticed that it basically waits not at all till the start and immediately tries to create the receiver at which point no dispatches are available at all, so the default size of 30 (in this case) is taken for the queue size and we can't take advantage of knowing how many dispatches there are and set the receiver queue size based on that.

As a result a normal startup would give a warning for microgrids where there are >30 dispatches and I didn't like that default behavior.

I don't think it is a good idea to have 2 modes now, the persistent and non-persistent mode, it also adds a lot of complexity for something we don't know we'll need yet. It was just a comment for something to look into in the future.

I am sorry, but it's a fact that we need it. At least the non-persistent behavior (for sahas power setting actor). And in my opinion the changes are quite minimal and non-invasive.

I find the retry logic extremely convoluted. Why can't we just have one task with a loop like the in the edge app? Right now to try to figure out how the retry works I need to look into like 3 different places, the retry logic is very spread out.

"very spread out"' three interaction points seem not to match that statement.. and the class itself is just 300 lines, so I don't feel this puts the total complexity above acceptable either. You can't spread things a lot in 300 lines :P
Anyway, all the logic is in the extra subclass with just three interaction points, that seems better to me, not worse?

I don't see we are gaining much from the added complexity.

Imo, we're gaining a lot.

  • Regarding persistence: We have at least one actor type that needs the non-persistent behavior. I added options so both is possible. Imo this is the best approach because we can that way evaluate easily,
    everyone at their own pace. And it unblocks sahas to integrate the power_setting actor.

@llucax
Copy link
Contributor

llucax commented Feb 27, 2025

That part was added after I started using the with() thing and noticed that it basically waits not at all till the start and immediately tries to create the receiver at which point no dispatches are available at all, so the default size of 30 (in this case) is taken for the queue size and we can't take advantage of knowing how many dispatches there are and set the receiver queue size based on that.

OK, so I think it then can make sense to have some async def wait_for_initialization() or something like that, and make __aenter__ call both start() and then await wait_for_initialization(), right?

I am sorry, but it's a fact that we need it.

Do we? Why? Having always removing and re-creating the actor should always work, right? Re-using the actor is sort of just an optimization.

At least the non-persistent behavior (for sahas power setting actor).

Yeah, so the mode that was previously implemented (always delete the actor when it is stopped) is the one that needs to be supported no matter what. The other mode, IMHO is not necessary, it can be implemented in the future transparently, and only delete an actor when the dispatch is completely done (it is done and have no recurrence rules, or has recurrence rules, but there are no more next events). In principle I don't think we need a mode to keep the actor alive always, forever. So I think we shouldn't do futurology and implement stuff we don't need yet, or don't know if it will be needed, as it adds development, maintenance and learning effort.

And in my opinion the changes are quite minimal and non-invasive.

Maybe in terms of code, but I think it also adds complexity in terms of interface, now users need to think about this, and if we want to do a 1.0 soon(ish), it means if it was a mistake, we need to keep maintaining it.

"very spread out"' three interaction points seem not to match that statement.. and the class itself is just 300 lines, so I don't feel this puts the total complexity above acceptable either. You can't spread things a lot in 300 lines :P
Anyway, all the logic is in the extra subclass with just three interaction points, that seems better to me, not worse?

3 places is three times more than 1, is a 300% increase. So yeah, very :trollface:

Jokes aside, I still don't get why the "one simple loop" approach we have in the edge app is not sufficient here. I'm not all that worried about adding complexity, just about adding unnecessary complexity.

Imo, we're gaining a lot.

  • Regarding persistence: We have at least one actor type that needs the non-persistent behavior. I added options so both is possible. Imo this is the best approach because we can that way evaluate easily,
    everyone at their own pace. And it unblocks sahas to integrate the power_setting actor.

But again, the point was never to remove "Sahas mode", so we only gained a mode that we don't even know if we need at all (and also my initial hunch is we don't, which makes me even more hesitant about adding it).

Even more, if we merged the previous iteration by just removing autostart, Sahas would be already unblock, but he now is still blocked. Oh dear, the Sahas dilemma 😆

Anyway, I'd prefer much more to merge the previous iteration than this one, but if you feel too strongly about reverting, I can live with it, as long as the bug about the dangling task is fixed. Then we can change stuff in followup PRs.

llucax
llucax previously approved these changes Mar 3, 2025
Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even when there are a couple of comments that I think should probably be addressed before releasing, at this point I think I would much rather merge as is than to keep iterating, as each time I kind of need to review the PR entirely, when only a few parts changed. It is much easier and quicker to review a new PR instead.

RELEASE_NOTES.md Outdated
await dispatcher.start_dispatching(
dispatch_type="EXAMPLE_TYPE",
actor_factory=MyActor.new_with_dispatch, # now async factory!
merge_strategy=MergeStrategy.MergeByType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? Shouldn't it be just MergeByType?

dispatch: The dispatch information to retry.
"""
task = asyncio.create_task(self._retry_after_delay(dispatch))
self._tasks.add(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now you need to finalize this object too, otherwise when the dispatcher that instantiates it is destroyed, all these tasks will be dangling until the garbage collection destroys them. My feeling is this class needs to end up being a BackgroundService too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those tasks only hang around 60 seconds or so anyway then they destroy themselves through the done callback below..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is not too bad, but there could be situation on program termination that the loop can emit warnings when there are pending tasks and the loop is stopped, specially in tests. So I agree it is not urgent, but eventually I would implement proper finalization for this, as most of the time that we say "meh, who cares about finalization", it eventually comes back to bite us :D

Comment on lines +281 to 290
actor = self._actors.get(identity)

if actor:
await actor.stop(msg)

del self._actors[identity]
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip:

Suggested change
actor = self._actors.get(identity)
if actor:
await actor.stop(msg)
del self._actors[identity]
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)
if actor := self._actors.pop(identity, None):
await actor.stop(msg)
else:
_logger.warning(
"Actor for dispatch type %r is not running", stopping_dispatch.type
)

@Marenz Marenz merged commit 13b35dc into frequenz-floss:v0.x.x Mar 3, 2025
14 checks passed
@Marenz Marenz deleted the lastly branch March 3, 2025 15:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:dispatcher Affects the high-level dispatcher interface part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants