Skip to content

Commit e22d937

Browse files
mariusaemeta-codesync[bot]
authored andcommitted
time::Alarm (#1404)
Summary: Pull Request resolved: #1404 This introduces `time::Alarm`. It behaves like a (concurrent) alarm: its owner can arm, rearm, and disarm the alarm; an arbitrary number of sleepers can be awoken by the alarm. Each sleeper is awoken only once for each time the alarm is armed. This is useful when implementing timed coordination between concurrent tasks, such as is the case when flushing buffers, etc. This will be used to implement timed flushing (i.e., Nagle's algorithm) for port reducers. ghstack-source-id: 313884428 Reviewed By: shayne-fletcher Differential Revision: D83768513 fbshipit-source-id: 13f32327c2d0a9f3075cabbd79a354484b427d64
1 parent 2c5907b commit e22d937

File tree

3 files changed

+218
-0
lines changed

3 files changed

+218
-0
lines changed

hyperactor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ maplit = "1.0"
9393
proptest = "1.5"
9494
tempfile = "3.22"
9595
timed_test = { version = "0.0.0", path = "../timed_test" }
96+
tokio-test = "0.4.4"
9697
tracing-subscriber = { version = "0.3.20", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }
9798
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
9899

hyperactor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub mod supervision;
9292
pub mod sync;
9393
/// Test utilities
9494
pub mod test_utils;
95+
mod time;
9596

9697
pub use actor::Actor;
9798
pub use actor::ActorHandle;

hyperactor/src/time.rs

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#![allow(dead_code)] // until next diff..
10+
11+
//! This module contains various utilities for dealing with time.
12+
13+
use std::sync::Arc;
14+
use std::sync::Mutex;
15+
use std::time::Duration;
16+
17+
use tokio::sync::Notify;
18+
use tokio::time::Instant;
19+
use tokio::time::sleep_until;
20+
21+
use crate::clock::Clock;
22+
use crate::clock::RealClock;
23+
24+
/// An alarm that can be Armed to fire at some future time.
25+
///
26+
/// Alarm is itself owned, and may have multiple sleepers attached
27+
/// to it. Each sleeper is awoken at most once for each alarm that has
28+
/// been set.
29+
///
30+
/// When instances of `Alarm` are dropped, sleepers are awoken,
31+
/// returning `false`, indicating that the alarm is defunct.
32+
pub struct Alarm {
33+
status: Arc<Mutex<AlarmStatus>>,
34+
notify: Arc<Notify>,
35+
version: usize,
36+
}
37+
enum AlarmStatus {
38+
Unarmed,
39+
Armed { version: usize, deadline: Instant },
40+
Dropped,
41+
}
42+
43+
impl Alarm {
44+
/// Create a new, unset alarm.
45+
pub fn new() -> Self {
46+
Self {
47+
status: Arc::new(Mutex::new(AlarmStatus::Unarmed)),
48+
notify: Arc::new(Notify::new()),
49+
version: 0,
50+
}
51+
}
52+
53+
/// Arm the alarm to fire after the provided duration.
54+
pub fn arm(&mut self, duration: Duration) {
55+
let mut status = self.status.lock().unwrap();
56+
*status = AlarmStatus::Armed {
57+
version: self.version,
58+
deadline: RealClock.now() + duration,
59+
};
60+
drop(status);
61+
self.notify.notify_waiters();
62+
self.version += 1;
63+
}
64+
65+
/// Disarm the alarm, canceling any pending alarms.
66+
pub fn disarm(&mut self) {
67+
let mut status = self.status.lock().unwrap();
68+
*status = AlarmStatus::Unarmed;
69+
drop(status);
70+
// Not technically needed (sleepers will still converge),
71+
// but this clears up the timers:
72+
self.notify.notify_waiters();
73+
}
74+
75+
/// Fire the alarm immediately.
76+
pub fn fire(&mut self) {
77+
self.arm(Duration::from_millis(0))
78+
}
79+
80+
/// Create a new sleeper for this alarm. Many sleepers can wait for the alarm
81+
/// to fire at any given time.
82+
pub fn sleeper(&self) -> AlarmSleeper {
83+
AlarmSleeper {
84+
status: Arc::clone(&self.status),
85+
notify: Arc::clone(&self.notify),
86+
min_version: 0,
87+
}
88+
}
89+
}
90+
91+
impl Drop for Alarm {
92+
fn drop(&mut self) {
93+
let mut status = self.status.lock().unwrap();
94+
*status = AlarmStatus::Dropped;
95+
drop(status);
96+
self.notify.notify_waiters();
97+
}
98+
}
99+
100+
impl Default for Alarm {
101+
fn default() -> Self {
102+
Self::new()
103+
}
104+
}
105+
106+
/// A single alarm sleeper.
107+
pub struct AlarmSleeper {
108+
status: Arc<Mutex<AlarmStatus>>,
109+
notify: Arc<Notify>,
110+
min_version: usize,
111+
}
112+
113+
impl AlarmSleeper {
114+
/// Sleep until the alarm fires. Returns true if the alarm fired,
115+
/// and false if the alarm has been dropped.
116+
///
117+
/// Sleep will fire (return true) at most once for each time the
118+
/// alarm is set.
119+
pub async fn sleep(&mut self) -> bool {
120+
loop {
121+
// Obtain a notifier before checking the state, to avoid the unlock-notify race.
122+
let notified = self.notify.notified();
123+
let deadline = match *self.status.lock().unwrap() {
124+
AlarmStatus::Dropped => return false,
125+
AlarmStatus::Unarmed => None,
126+
AlarmStatus::Armed { version, .. } if version < self.min_version => None,
127+
AlarmStatus::Armed { version, deadline } if RealClock.now() >= deadline => {
128+
self.min_version = version + 1;
129+
return true;
130+
}
131+
AlarmStatus::Armed {
132+
version: _,
133+
deadline,
134+
} => Some(deadline.clone()),
135+
};
136+
137+
if let Some(deadline) = deadline {
138+
tokio::select! {
139+
_ = sleep_until(deadline) => (),
140+
_ = notified => (),
141+
}
142+
} else {
143+
notified.await;
144+
}
145+
}
146+
}
147+
}
148+
149+
#[cfg(test)]
150+
mod tests {
151+
use std::time::Duration;
152+
153+
use tokio_test::assert_pending;
154+
use tokio_test::task;
155+
156+
use super::*;
157+
158+
#[tokio::test]
159+
async fn test_basic() {
160+
let mut alarm = Alarm::new();
161+
let mut sleeper = alarm.sleeper();
162+
let handle = tokio::spawn(async move { sleeper.sleep().await });
163+
assert!(!handle.is_finished()); // not super meaningful..
164+
165+
alarm.fire();
166+
167+
assert!(handle.await.unwrap());
168+
169+
let mut sleeper = alarm.sleeper();
170+
alarm.arm(Duration::from_secs(600));
171+
let handle = tokio::spawn(async move { sleeper.sleep().await });
172+
drop(alarm);
173+
// Dropped:
174+
assert!(!handle.await.unwrap());
175+
}
176+
177+
#[tokio::test]
178+
async fn test_sleep_once() {
179+
let mut alarm = Alarm::new();
180+
alarm.fire();
181+
let mut sleeper = alarm.sleeper();
182+
assert!(sleeper.sleep().await);
183+
184+
// Don't wake up again:
185+
assert_pending!(task::spawn(sleeper.sleep()).poll());
186+
alarm.fire();
187+
assert!(sleeper.sleep().await);
188+
// Don't wake up again:
189+
assert_pending!(task::spawn(sleeper.sleep()).poll());
190+
drop(alarm);
191+
assert!(!sleeper.sleep().await);
192+
}
193+
194+
#[tokio::test]
195+
async fn test_reset() {
196+
let mut alarm = Alarm::new();
197+
alarm.arm(Duration::from_secs(600));
198+
let mut sleeper = alarm.sleeper();
199+
assert_pending!(task::spawn(sleeper.sleep()).poll());
200+
// Should reset after setting to an earlier time:
201+
alarm.arm(Duration::from_millis(10));
202+
assert!(sleeper.sleep().await);
203+
}
204+
205+
#[tokio::test]
206+
async fn test_disarm() {
207+
let mut alarm = Alarm::new();
208+
alarm.arm(Duration::from_secs(600));
209+
let mut sleeper = alarm.sleeper();
210+
assert_pending!(task::spawn(sleeper.sleep()).poll());
211+
alarm.disarm();
212+
assert_pending!(task::spawn(sleeper.sleep()).poll());
213+
alarm.arm(Duration::from_millis(10));
214+
assert!(sleeper.sleep().await);
215+
}
216+
}

0 commit comments

Comments
 (0)