Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@

## Upgrading

- The dispatch high level interface now depends on `frequenz-sdk` version `v1.0.0-rc900`.
- We are now using the version `0.6.0` of the underlying `frequenz-client-dispatch` client library.
- The init parameter of the `Dispatcher` class has been changed to accept a `server_url` instead.
* `Dispatcher.running_state_change` now also sends a message when the duration specified in the dispatch has passed. If no duration is specified, no STOPPED message will be sent.

## New Features

* Using the new dispatch client, we now have support for pagination in the dispatch list request.
* The new client version also supports streaming, however it is not yet used internally in the high level interface.
<!-- Here goes the main new features and examples or instructions on how to use them -->

## Bug Fixes

- Fix documentation cross-linking to the `frequenz-client-dispatch` package.
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
14 changes: 12 additions & 2 deletions src/frequenz/dispatch/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,20 @@ def next_run_info() -> tuple[datetime, datetime] | None:
_logger.info("Dispatch %s scheduled for %s", dispatch.id, next_time)
await asyncio.sleep((next_time - now).total_seconds())

_logger.info("Dispatch ready: %s", dispatch)
_logger.info("Dispatch %s executing...", dispatch)
await self._running_state_change_sender.send(dispatch)

_logger.info("Dispatch finished: %s", dispatch)
# Wait for the duration of the dispatch if set
if dispatch.duration:
_logger.info(
"Dispatch %s running for %s", dispatch.id, dispatch.duration
)
await asyncio.sleep(dispatch.duration.total_seconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure a sleep will work? Doesn't this block receiving/running any other dispatch while the current dispatch is running? Shouldn't you use timers instead or this code runs on a separate task for every dispatch that is currently running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ,currently we have one task for each dispatch running.


_logger.info("Dispatch %s runtime duration reached", dispatch.id)
await self._running_state_change_sender.send(dispatch)

_logger.info("Dispatch completed: %s", dispatch)
self._scheduled.pop(dispatch.id)

def _running_state_change(
Expand Down
9 changes: 9 additions & 0 deletions tests/test_frequenz_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ async def test_dispatch_schedule(
fake_time.shift(next_run - _now() - timedelta(seconds=1))
await asyncio.sleep(1)

# Expect notification of the dispatch being ready to run
ready_dispatch = await actor_env.ready_dispatches.receive()

assert ready_dispatch == dispatch

# Shift time to the end of the dispatch
fake_time.shift(dispatch.duration + timedelta(seconds=1))
await asyncio.sleep(1)

# Expect notification to stop the dispatch
done_dispatch = await actor_env.ready_dispatches.receive()
assert done_dispatch == dispatch
Loading