Skip to content

Commit aa93f58

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Use tokio::time::Duration and tokio::time::Instant for timekeeping (#938)
Summary: Previously we had to use u64 for serialization reasons but those reasons no longer exist Reviewed By: dulinriley Differential Revision: D80556690
1 parent dbce7b4 commit aa93f58

File tree

3 files changed

+72
-66
lines changed

3 files changed

+72
-66
lines changed

hyperactor/src/channel/sim.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use super::*;
2222
use crate::channel;
2323
use crate::clock::Clock;
2424
use crate::clock::RealClock;
25-
use crate::clock::SimClock;
2625
use crate::data::Serialized;
2726
use crate::mailbox::MessageEnvelope;
2827
use crate::simnet;
@@ -129,7 +128,7 @@ pub(crate) struct MessageDeliveryEvent {
129128
src_addr: Option<ChannelAddr>,
130129
dest_addr: ChannelAddr,
131130
data: Serialized,
132-
duration_ms: u64,
131+
duration: tokio::time::Duration,
133132
}
134133

135134
impl MessageDeliveryEvent {
@@ -139,7 +138,7 @@ impl MessageDeliveryEvent {
139138
src_addr,
140139
dest_addr,
141140
data,
142-
duration_ms: 100,
141+
duration: tokio::time::Duration::from_millis(100),
143142
}
144143
}
145144
}
@@ -158,8 +157,8 @@ impl Event for MessageDeliveryEvent {
158157
Ok(())
159158
}
160159

161-
fn duration_ms(&self) -> u64 {
162-
self.duration_ms
160+
fn duration(&self) -> tokio::time::Duration {
161+
self.duration
163162
}
164163

165164
fn summary(&self) -> String {
@@ -178,12 +177,12 @@ impl Event for MessageDeliveryEvent {
178177
src: src_addr.clone(),
179178
dst: self.dest_addr.clone(),
180179
};
181-
self.duration_ms = topology
180+
self.duration = topology
182181
.lock()
183182
.await
184183
.topology
185184
.get(&edge)
186-
.map_or_else(|| 1, |v| v.latency.as_millis() as u64);
185+
.map_or_else(|| tokio::time::Duration::from_millis(1), |v| v.latency);
187186
}
188187
}
189188
}
@@ -332,7 +331,7 @@ impl<M: RemoteMessage> Tx<M> for SimTx<M> {
332331
self.dst_addr.clone(),
333332
data,
334333
)),
335-
time: SimClock.millis_since_start(RealClock.now()),
334+
time: RealClock.now(),
336335
}),
337336
_ => handle.send_event(Box::new(MessageDeliveryEvent::new(
338337
self.src_addr.clone(),
@@ -551,7 +550,10 @@ mod tests {
551550
.await
552551
.unwrap();
553552

554-
assert_eq!(SimClock.millis_since_start(RealClock.now()), 0);
553+
assert_eq!(
554+
SimClock.duration_since_start(RealClock.now()),
555+
tokio::time::Duration::ZERO
556+
);
555557
// Fast forward real time to 5 seconds
556558
tokio::time::advance(tokio::time::Duration::from_secs(5)).await;
557559
{

hyperactor/src/clock.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,7 @@ impl Clock for SimClock {
197197

198198
simnet_handle()
199199
.unwrap()
200-
.send_event(SleepEvent::new(
201-
tx.bind(),
202-
mailbox,
203-
duration.as_millis() as u64,
204-
))
200+
.send_event(SleepEvent::new(tx.bind(), mailbox, duration))
205201
.unwrap();
206202
rx.recv().await.unwrap();
207203
}
@@ -212,11 +208,7 @@ impl Clock for SimClock {
212208

213209
simnet_handle()
214210
.unwrap()
215-
.send_nonadvanceable_event(SleepEvent::new(
216-
tx.bind(),
217-
mailbox,
218-
duration.as_millis() as u64,
219-
))
211+
.send_nonadvanceable_event(SleepEvent::new(tx.bind(), mailbox, duration))
220212
.unwrap();
221213
rx.recv().await.unwrap();
222214
}
@@ -247,11 +239,7 @@ impl Clock for SimClock {
247239

248240
simnet_handle()
249241
.unwrap()
250-
.send_event(SleepEvent::new(
251-
tx.bind(),
252-
mailbox,
253-
duration.as_millis() as u64,
254-
))
242+
.send_event(SleepEvent::new(tx.bind(), mailbox, duration))
255243
.unwrap();
256244

257245
let fut = f;
@@ -290,14 +278,19 @@ impl SimClock {
290278
}
291279

292280
/// Advance the sumulator's time to the specified instant
293-
pub fn advance_to(&self, millis: u64) {
281+
pub fn advance_to(&self, time: tokio::time::Instant) {
294282
let mut guard = SIM_TIME.now.lock().unwrap();
295-
*guard = SIM_TIME.start + tokio::time::Duration::from_millis(millis);
283+
*guard = time;
296284
}
297285

298286
/// Get the number of milliseconds elapsed since the start of the simulation
299-
pub fn millis_since_start(&self, instant: tokio::time::Instant) -> u64 {
300-
instant.duration_since(SIM_TIME.start).as_millis() as u64
287+
pub fn duration_since_start(&self, instant: tokio::time::Instant) -> tokio::time::Duration {
288+
instant.duration_since(SIM_TIME.start)
289+
}
290+
291+
/// Instant marking the start of the simulation
292+
pub fn start(&self) -> tokio::time::Instant {
293+
SIM_TIME.start.clone()
301294
}
302295
}
303296

@@ -347,10 +340,16 @@ mod tests {
347340
#[tokio::test]
348341
async fn test_sim_clock_simple() {
349342
let start = SimClock.now();
350-
assert_eq!(SimClock.millis_since_start(start), 0);
351-
SimClock.advance_to(10000);
343+
assert_eq!(
344+
SimClock.duration_since_start(start),
345+
tokio::time::Duration::ZERO
346+
);
347+
SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
352348
let end = SimClock.now();
353-
assert_eq!(SimClock.millis_since_start(end), 10000);
349+
assert_eq!(
350+
SimClock.duration_since_start(end),
351+
tokio::time::Duration::from_millis(10000)
352+
);
354353
assert_eq!(
355354
end.duration_since(start),
356355
tokio::time::Duration::from_secs(10)
@@ -360,7 +359,7 @@ mod tests {
360359
#[tokio::test]
361360
async fn test_sim_clock_system_time() {
362361
let start = SimClock.system_time_now();
363-
SimClock.advance_to(10000);
362+
SimClock.advance_to(SimClock.start() + tokio::time::Duration::from_millis(10000));
364363
let end = SimClock.system_time_now();
365364
assert_eq!(
366365
end.duration_since(start).unwrap(),

hyperactor/src/simnet.rs

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ const OPERATIONAL_MESSAGE_BUFFER_SIZE: usize = 8;
7070
pub trait Address: Hash + Debug + Eq + PartialEq + Ord + PartialOrd + Clone {}
7171
impl<A: Hash + Debug + Eq + PartialEq + Ord + PartialOrd + Clone> Address for A {}
7272

73-
type SimulatorTimeInstant = u64;
73+
type SimulatorTimeInstant = tokio::time::Instant;
7474

7575
/// The unit of execution for the simulator.
7676
/// Using handle(), simnet can schedule executions in the network.
@@ -99,7 +99,7 @@ pub trait Event: Send + Sync + Debug {
9999

100100
/// The latency of the event. This could be network latency, induced latency (sleep), or
101101
/// GPU work latency.
102-
fn duration_ms(&self) -> u64;
102+
fn duration(&self) -> tokio::time::Duration;
103103

104104
/// Read the simnet config and update self accordingly.
105105
async fn read_simnet_config(&mut self, _topology: &Arc<Mutex<SimNetConfig>>) {}
@@ -126,8 +126,8 @@ impl Event for NodeJoinEvent {
126126
self.handle().await
127127
}
128128

129-
fn duration_ms(&self) -> u64 {
130-
0
129+
fn duration(&self) -> tokio::time::Duration {
130+
tokio::time::Duration::ZERO
131131
}
132132

133133
fn summary(&self) -> String {
@@ -139,15 +139,19 @@ impl Event for NodeJoinEvent {
139139
pub(crate) struct SleepEvent {
140140
done_tx: OncePortRef<()>,
141141
mailbox: Mailbox,
142-
duration_ms: u64,
142+
duration: tokio::time::Duration,
143143
}
144144

145145
impl SleepEvent {
146-
pub(crate) fn new(done_tx: OncePortRef<()>, mailbox: Mailbox, duration_ms: u64) -> Box<Self> {
146+
pub(crate) fn new(
147+
done_tx: OncePortRef<()>,
148+
mailbox: Mailbox,
149+
duration: tokio::time::Duration,
150+
) -> Box<Self> {
147151
Box::new(Self {
148152
done_tx,
149153
mailbox,
150-
duration_ms,
154+
duration,
151155
})
152156
}
153157
}
@@ -166,12 +170,12 @@ impl Event for SleepEvent {
166170
Ok(())
167171
}
168172

169-
fn duration_ms(&self) -> u64 {
170-
self.duration_ms
173+
fn duration(&self) -> tokio::time::Duration {
174+
self.duration
171175
}
172176

173177
fn summary(&self) -> String {
174-
format!("Sleeping for {} ms", self.duration_ms)
178+
format!("Sleeping for {} ms", self.duration.as_millis())
175179
}
176180
}
177181

@@ -200,8 +204,8 @@ impl Event for TorchOpEvent {
200204
Ok(())
201205
}
202206

203-
fn duration_ms(&self) -> u64 {
204-
100
207+
fn duration(&self) -> tokio::time::Duration {
208+
tokio::time::Duration::from_millis(100)
205209
}
206210

207211
fn summary(&self) -> String {
@@ -561,22 +565,20 @@ impl SimNet {
561565
// Get latency
562566
event.read_simnet_config(&self.config).await;
563567
ScheduledEvent {
564-
time: SimClock.millis_since_start(
565-
SimClock.now() + tokio::time::Duration::from_millis(event.duration_ms()),
566-
),
568+
time: SimClock.now() + event.duration(),
567569
event,
568570
}
569571
}
570572

571573
/// Schedule the event into the network.
572574
fn schedule_event(&mut self, scheduled_event: ScheduledEvent, advanceable: bool) {
573-
let start_at = SimClock.millis_since_start(SimClock.now());
575+
let start_at = SimClock.now();
574576
let end_at = scheduled_event.time;
575577

576578
self.records.push(SimulatorEventRecord {
577579
summary: scheduled_event.event.summary(),
578-
start_at,
579-
end_at,
580+
start_at: SimClock.duration_since_start(start_at).as_millis() as u64,
581+
end_at: SimClock.duration_since_start(end_at).as_millis() as u64,
580582
});
581583

582584
if advanceable {
@@ -604,7 +606,7 @@ impl SimNet {
604606
) -> Vec<SimulatorEventRecord> {
605607
// The simulated number of milliseconds the training script
606608
// has spent waiting for the backend to resolve a future
607-
let mut training_script_waiting_time: u64 = 0;
609+
let mut training_script_waiting_time = tokio::time::Duration::from_millis(0);
608610
// Duration elapsed while only non_advanceable_events has events
609611
let mut debounce_timer: Option<tokio::time::Instant> = None;
610612
'outer: loop {
@@ -638,9 +640,7 @@ impl SimNet {
638640
.scheduled_events
639641
.first_key_value()
640642
.is_some_and(|(time, _)| {
641-
*time
642-
> SimClock.millis_since_start(RealClock.now())
643-
+ training_script_waiting_time
643+
*time > RealClock.now() + training_script_waiting_time
644644
})
645645
{
646646
tokio::task::yield_now().await;
@@ -705,8 +705,7 @@ impl SimNet {
705705
continue;
706706
};
707707
if training_script_state_rx.borrow().is_waiting() {
708-
let advanced_time =
709-
scheduled_time - SimClock.millis_since_start(SimClock.now());
708+
let advanced_time = scheduled_time - SimClock.now();
710709
training_script_waiting_time += advanced_time;
711710
}
712711
SimClock.advance_to(scheduled_time);
@@ -749,9 +748,9 @@ pub struct SimulatorEventRecord {
749748
/// Event dependent summary for user
750749
pub summary: String,
751750
/// The time at which the message delivery was started.
752-
pub start_at: SimulatorTimeInstant,
751+
pub start_at: u64,
753752
/// The time at which the message was delivered to the receiver.
754-
pub end_at: SimulatorTimeInstant,
753+
pub end_at: u64,
755754
}
756755

757756
/// A configuration for the network topology.
@@ -805,7 +804,7 @@ mod tests {
805804
src_addr: SimAddr,
806805
dest_addr: SimAddr,
807806
data: Serialized,
808-
duration_ms: u64,
807+
duration: tokio::time::Duration,
809808
dispatcher: Option<TestDispatcher>,
810809
}
811810

@@ -823,8 +822,8 @@ mod tests {
823822
}
824823
Ok(())
825824
}
826-
fn duration_ms(&self) -> u64 {
827-
self.duration_ms
825+
fn duration(&self) -> tokio::time::Duration {
826+
self.duration
828827
}
829828

830829
fn summary(&self) -> String {
@@ -840,12 +839,12 @@ mod tests {
840839
src: self.src_addr.addr().clone(),
841840
dst: self.dest_addr.addr().clone(),
842841
};
843-
self.duration_ms = config
842+
self.duration = config
844843
.lock()
845844
.await
846845
.topology
847846
.get(&edge)
848-
.map_or_else(|| 1, |v| v.latency.as_millis() as u64);
847+
.map_or_else(|| tokio::time::Duration::from_millis(1), |v| v.latency);
849848
}
850849
}
851850

@@ -860,7 +859,7 @@ mod tests {
860859
src_addr,
861860
dest_addr,
862861
data,
863-
duration_ms: 1,
862+
duration: tokio::time::Duration::from_millis(1),
864863
dispatcher,
865864
}
866865
}
@@ -1132,12 +1131,18 @@ mod tests {
11321131
start();
11331132

11341133
let start = SimClock.now();
1135-
assert_eq!(SimClock.millis_since_start(start), 0);
1134+
assert_eq!(
1135+
SimClock.duration_since_start(start),
1136+
tokio::time::Duration::ZERO
1137+
);
11361138

11371139
SimClock.sleep(tokio::time::Duration::from_secs(10)).await;
11381140

11391141
let end = SimClock.now();
1140-
assert_eq!(SimClock.millis_since_start(end), 10000);
1142+
assert_eq!(
1143+
SimClock.duration_since_start(end),
1144+
tokio::time::Duration::from_secs(10)
1145+
);
11411146
}
11421147

11431148
#[tokio::test]

0 commit comments

Comments
 (0)