Skip to content

Commit 2539d02

Browse files
committed
Add scheduling class optimized for frequently changing timers
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 62c2248 commit 2539d02

File tree

4 files changed

+368
-0
lines changed

4 files changed

+368
-0
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Frequenz channels Release Notes
22

3+
## New Features
4+
5+
- A new class `frequenz.channels.time_scheduler.TimeScheduler` was added. It's optimized for scenarios where events get added, rescheduled or canceled frequently.
6+
37
## Bug Fixes
48

59
- `FileWatcher`: Fixed `ready()` method to return False when an error occurs. Before this fix, `select()` (and other code using `ready()`) never detected the `FileWatcher` was stopped and the `select()` loop was continuously waking up to inform the receiver was ready.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ dev-pylint = [
7474
]
7575
dev-pytest = [
7676
"async-solipsism == 0.7",
77+
"time-machine == 2.15.0",
7778
"frequenz-repo-config[extra-lint-examples] == 0.10.0",
7879
"hypothesis == 6.111.2",
7980
"pytest == 8.3.2",
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Timer scheduler to schedule and events at specific times."""
5+
6+
import asyncio
7+
import heapq
8+
import itertools
9+
from dataclasses import dataclass, field
10+
from datetime import datetime, timedelta, timezone
11+
from typing import Dict, Generic, TypeVar
12+
13+
from frequenz.channels import Sender
14+
15+
T = TypeVar("T") # Generic type for the object associated with events
16+
17+
18+
@dataclass(order=True)
19+
class ScheduledEvent(Generic[T]):
20+
"""Represents an event scheduled to be dispatched at a specific time."""
21+
22+
scheduled_time: datetime
23+
obj: T = field(compare=False)
24+
unique_id: int = field(compare=False, default=0)
25+
canceled: bool = field(compare=False, default=False)
26+
27+
28+
class TimerScheduler(Generic[T]):
29+
"""Class to schedule and dispatch events at specific times.
30+
31+
Usage example:
32+
```python
33+
34+
import asyncio
35+
from frequenz.channels.timer_scheduler import TimerScheduler
36+
from frequenz.channels import Broadcast
37+
from datetime import timedelta
38+
39+
async def main():
40+
event_channel = Broadcast[str](name="events")
41+
sender = event_channel.new_sender()
42+
receiver = event_channel.new_receiver()
43+
44+
scheduler = TimerScheduler[str](sender)
45+
46+
scheduler.set_timer(fire_in=timedelta(seconds=5), obj="event1")
47+
scheduler.set_timer(fire_in=timedelta(seconds=10), obj="event2")
48+
scheduler.set_timer(fire_in=timedelta(seconds=10), obj="event3")
49+
50+
# Waits 5 seconds and returns "event1"
51+
assert await receiver.receive() == "event1"
52+
53+
# Remove the "event2" timer
54+
scheduler.unset_timer("event2")
55+
56+
# Reschedule "event3" to fire in 15 seconds
57+
scheduler.set_timer(fire_in=timedelta(seconds=15), obj="event3")
58+
59+
# Waits 15 more seconds and returns "event3"
60+
assert await receiver.receive() == "event3"
61+
"""
62+
63+
def __init__(self, sender: Sender[T]) -> None:
64+
"""Initialize the TimerScheduler with the given sender.
65+
66+
Parameters:
67+
sender: The sender to dispatch the events.
68+
"""
69+
self._sender = sender
70+
self._event_heap: list[ScheduledEvent[T]] = []
71+
self._obj_to_event: Dict[T, ScheduledEvent[T]] = {}
72+
self._counter = itertools.count()
73+
self._current_task: asyncio.Task[None] | None = None
74+
self._stopped = False
75+
76+
def set_timer(
77+
self,
78+
*,
79+
obj: T,
80+
fire_in: timedelta | None = None,
81+
fire_at: datetime | None = None,
82+
) -> bool:
83+
"""
84+
Schedule a new event or reschedule an existing one.
85+
86+
Args:
87+
obj: The object associated with the event.
88+
fire_in: Time after which the event should be dispatched. Conflicts with fire_at.
89+
fire_at: Time at which the event should be dispatched. Conflicts with fire_in.
90+
91+
Returns:
92+
True if the event was successfully scheduled; False otherwise.
93+
"""
94+
now = datetime.now(timezone.utc)
95+
96+
scheduled_time = (now + fire_in) if fire_in else fire_at
97+
assert scheduled_time, "Either 'fire_in' or 'fire_at' must be provided."
98+
99+
if scheduled_time < now:
100+
return False
101+
102+
# Check if the object is already scheduled
103+
if obj in self._obj_to_event:
104+
existing_event = self._obj_to_event[obj]
105+
existing_event.canceled = True # Mark the existing event as canceled
106+
107+
# Create a new scheduled event
108+
unique_id = next(self._counter)
109+
new_event = ScheduledEvent(
110+
scheduled_time=scheduled_time, unique_id=unique_id, obj=obj
111+
)
112+
heapq.heappush(self._event_heap, new_event)
113+
self._obj_to_event[obj] = new_event
114+
115+
# If the new event is the earliest, reset the waiting task
116+
if self._event_heap[0] == new_event:
117+
if self._current_task:
118+
self._current_task.cancel()
119+
self._current_task = asyncio.create_task(self._wait_and_dispatch())
120+
121+
return True
122+
123+
def unset_timer(self, obj: T) -> bool:
124+
"""
125+
Cancel a scheduled event associated with the given object.
126+
127+
Args:
128+
obj: The object associated with the event to cancel.
129+
130+
Returns:
131+
True if the event was found and canceled; False otherwise.
132+
"""
133+
if obj in self._obj_to_event:
134+
existing_event = self._obj_to_event[obj]
135+
existing_event.canceled = True # Mark the event as canceled
136+
del self._obj_to_event[obj]
137+
138+
# If the canceled event was the next to be dispatched, reset the waiting task
139+
if self._event_heap and self._event_heap[0].obj == obj:
140+
if self._current_task:
141+
self._current_task.cancel()
142+
self._current_task = asyncio.create_task(self._wait_and_dispatch())
143+
144+
return True
145+
146+
return False
147+
148+
async def _wait_and_dispatch(self) -> None:
149+
"""Wait for the next event to be due and dispatch it."""
150+
while not self._stopped:
151+
if not self._event_heap:
152+
self._current_task = None
153+
return
154+
155+
next_event = self._event_heap[0]
156+
now = datetime.now(timezone.utc)
157+
delay = (next_event.scheduled_time - now).total_seconds()
158+
159+
if delay <= 0:
160+
# Check if the event still exists
161+
if next_event.obj not in self._obj_to_event:
162+
# Skip canceled events
163+
heapq.heappop(self._event_heap)
164+
continue
165+
166+
# Event is due
167+
heapq.heappop(self._event_heap)
168+
del self._obj_to_event[next_event.obj]
169+
170+
if next_event.canceled:
171+
# Skip canceled events
172+
continue
173+
174+
# Dispatch the event
175+
await self._sender.send(next_event.obj)
176+
continue # Check for the next event
177+
178+
try:
179+
# Wait until the next event's scheduled_time
180+
self._current_task = asyncio.create_task(asyncio.sleep(delay))
181+
await self._current_task
182+
except asyncio.CancelledError:
183+
# A new earlier event was scheduled; exit to handle it
184+
return
185+
186+
async def stop(self) -> None:
187+
"""Stop the scheduler and cancel any pending tasks."""
188+
self._stopped = True
189+
if self._current_task:
190+
self._current_task.cancel()
191+
try:
192+
await self._current_task
193+
except asyncio.CancelledError:
194+
pass

tests/test_timer_scheduler.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Test module for the TimerScheduler class."""
5+
6+
import asyncio
7+
from datetime import datetime, timedelta
8+
9+
import async_solipsism
10+
import time_machine
11+
from pytest import fixture
12+
13+
from frequenz.channels import Broadcast
14+
from frequenz.channels.timer_scheduler import TimerScheduler
15+
16+
17+
@fixture
18+
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
19+
"""Return an event loop policy that uses the async solipsism event loop."""
20+
return async_solipsism.EventLoopPolicy()
21+
22+
23+
async def test_set_timer() -> None:
24+
"""Test that a scheduled event is dispatched at the correct (mocked) time."""
25+
# Create a Broadcast channel
26+
bcast = Broadcast[str](name="test")
27+
28+
# Create a sender and receiver
29+
sender = bcast.new_sender()
30+
receiver = bcast.new_receiver()
31+
32+
# Initialize the TimerScheduler with the sender
33+
sched = TimerScheduler(sender)
34+
35+
# List to collect received events
36+
received_events = []
37+
38+
# Define the consumer coroutine
39+
async def consumer() -> None:
40+
async for event in receiver:
41+
received_events.append(event)
42+
43+
# Start the consumer as an asyncio task
44+
consumer_task = asyncio.create_task(consumer())
45+
46+
# Freeze time at 2024-01-01 12:00:00
47+
with time_machine.travel(datetime(2024, 1, 1, 12, 0, 0)):
48+
# Schedule 'event1' to fire in 10 seconds
49+
sched.set_timer(fire_in=timedelta(seconds=10), obj="event1")
50+
51+
# Advance time to 2024-01-01 12:00:10 to make 'event1' due
52+
with time_machine.travel(datetime(2024, 1, 1, 12, 0, 10)):
53+
# Allow some time for the event to be dispatched
54+
await asyncio.sleep(0.1)
55+
56+
# Assert that 'event1' was received
57+
assert (
58+
"event1" in received_events
59+
), "The event 'event1' was not dispatched as expected."
60+
61+
# Clean up by stopping the scheduler and cancelling the consumer task
62+
await sched.stop()
63+
consumer_task.cancel()
64+
try:
65+
await consumer_task
66+
except asyncio.CancelledError:
67+
pass
68+
69+
70+
async def test_reschedule_timer() -> None:
71+
"""Test that rescheduling an event updates its dispatch time correctly."""
72+
# Create a Broadcast channel
73+
bcast = Broadcast[str](name="test")
74+
75+
# Create a sender and receiver
76+
sender = bcast.new_sender()
77+
receiver = bcast.new_receiver()
78+
79+
# Initialize the TimerScheduler with the sender
80+
sched = TimerScheduler(sender)
81+
82+
# List to collect received events
83+
received_events = []
84+
85+
# Define the consumer coroutine
86+
async def consumer() -> None:
87+
async for event in receiver:
88+
received_events.append(event)
89+
90+
# Start the consumer as an asyncio task
91+
consumer_task = asyncio.create_task(consumer())
92+
93+
# Freeze time at 2024-01-01 12:00:00
94+
with time_machine.travel(datetime(2024, 1, 1, 12, 0, 0)):
95+
# Schedule 'event1' to fire in 10 seconds
96+
sched.set_timer(fire_in=timedelta(seconds=10), obj="event1")
97+
98+
# Reschedule 'event1' to fire in 5 seconds
99+
sched.set_timer(fire_in=timedelta(seconds=5), obj="event1")
100+
101+
# Advance time to 2024-01-01 12:00:05 to make 'event1' due
102+
with time_machine.travel(datetime(2024, 1, 1, 12, 0, 5)):
103+
# Allow some time for the event to be dispatched
104+
await asyncio.sleep(0.1)
105+
106+
# Assert that 'event1' was received only once
107+
assert (
108+
received_events.count("event1") == 1
109+
), "The event 'event1' was dispatched multiple times."
110+
111+
# Clean up by stopping the scheduler and cancelling the consumer task
112+
await sched.stop()
113+
consumer_task.cancel()
114+
try:
115+
await consumer_task
116+
except asyncio.CancelledError:
117+
pass
118+
119+
120+
async def test_unset_timer() -> None:
121+
"""Test that cancelling a scheduled event prevents it from being dispatched."""
122+
# Create a Broadcast channel
123+
bcast = Broadcast[str](name="test")
124+
125+
# Create a sender and receiver
126+
sender = bcast.new_sender()
127+
receiver = bcast.new_receiver()
128+
129+
# Initialize the TimerScheduler with the sender
130+
sched = TimerScheduler(sender)
131+
132+
# List to collect received events
133+
received_events = []
134+
135+
# Define the consumer coroutine
136+
async def consumer() -> None:
137+
async for event in receiver:
138+
received_events.append(event)
139+
140+
# Start the consumer as an asyncio task
141+
consumer_task = asyncio.create_task(consumer())
142+
143+
# Freeze time at 2024-01-01 12:00:00
144+
with time_machine.travel(datetime(2024, 1, 1, 12, 0, 0)):
145+
# Schedule 'event1' to fire in 10 seconds
146+
sched.set_timer(fire_in=timedelta(seconds=10), obj="event1")
147+
148+
# Advance time to 2024-01-01 12:00:05
149+
with time_machine.travel(datetime(2024, 1, 1, 12, 0, 5)):
150+
# Cancel 'event1' before it's due
151+
sched.unset_timer("event1")
152+
153+
# Advance time to 2024-01-01 12:00:10 to reach the original dispatch time
154+
with time_machine.travel(datetime(2024, 1, 1, 12, 0, 10)):
155+
# Allow some time for the dispatcher to process
156+
await asyncio.sleep(0.1)
157+
158+
# Assert that 'event1' was not received
159+
assert (
160+
"event1" not in received_events
161+
), "The event 'event1' was dispatched despite being canceled."
162+
163+
# Clean up by stopping the scheduler and cancelling the consumer task
164+
await sched.stop()
165+
consumer_task.cancel()
166+
try:
167+
await consumer_task
168+
except asyncio.CancelledError:
169+
pass

0 commit comments

Comments
 (0)