@@ -70,7 +70,7 @@ const OPERATIONAL_MESSAGE_BUFFER_SIZE: usize = 8;
70
70
pub trait Address : Hash + Debug + Eq + PartialEq + Ord + PartialOrd + Clone { }
71
71
impl < A : Hash + Debug + Eq + PartialEq + Ord + PartialOrd + Clone > Address for A { }
72
72
73
- type SimulatorTimeInstant = u64 ;
73
+ type SimulatorTimeInstant = tokio :: time :: Instant ;
74
74
75
75
/// The unit of execution for the simulator.
76
76
/// Using handle(), simnet can schedule executions in the network.
@@ -99,7 +99,7 @@ pub trait Event: Send + Sync + Debug {
99
99
100
100
/// The latency of the event. This could be network latency, induced latency (sleep), or
101
101
/// GPU work latency.
102
- fn duration_ms ( & self ) -> u64 ;
102
+ fn duration ( & self ) -> tokio :: time :: Duration ;
103
103
104
104
/// Read the simnet config and update self accordingly.
105
105
async fn read_simnet_config ( & mut self , _topology : & Arc < Mutex < SimNetConfig > > ) { }
@@ -126,8 +126,8 @@ impl Event for NodeJoinEvent {
126
126
self . handle ( ) . await
127
127
}
128
128
129
- fn duration_ms ( & self ) -> u64 {
130
- 0
129
+ fn duration ( & self ) -> tokio :: time :: Duration {
130
+ tokio :: time :: Duration :: ZERO
131
131
}
132
132
133
133
fn summary ( & self ) -> String {
@@ -139,15 +139,19 @@ impl Event for NodeJoinEvent {
139
139
pub ( crate ) struct SleepEvent {
140
140
done_tx : OncePortRef < ( ) > ,
141
141
mailbox : Mailbox ,
142
- duration_ms : u64 ,
142
+ duration : tokio :: time :: Duration ,
143
143
}
144
144
145
145
impl SleepEvent {
146
- pub ( crate ) fn new ( done_tx : OncePortRef < ( ) > , mailbox : Mailbox , duration_ms : u64 ) -> Box < Self > {
146
+ pub ( crate ) fn new (
147
+ done_tx : OncePortRef < ( ) > ,
148
+ mailbox : Mailbox ,
149
+ duration : tokio:: time:: Duration ,
150
+ ) -> Box < Self > {
147
151
Box :: new ( Self {
148
152
done_tx,
149
153
mailbox,
150
- duration_ms ,
154
+ duration ,
151
155
} )
152
156
}
153
157
}
@@ -166,12 +170,12 @@ impl Event for SleepEvent {
166
170
Ok ( ( ) )
167
171
}
168
172
169
- fn duration_ms ( & self ) -> u64 {
170
- self . duration_ms
173
+ fn duration ( & self ) -> tokio :: time :: Duration {
174
+ self . duration
171
175
}
172
176
173
177
fn summary ( & self ) -> String {
174
- format ! ( "Sleeping for {} ms" , self . duration_ms )
178
+ format ! ( "Sleeping for {} ms" , self . duration . as_millis ( ) )
175
179
}
176
180
}
177
181
@@ -200,8 +204,8 @@ impl Event for TorchOpEvent {
200
204
Ok ( ( ) )
201
205
}
202
206
203
- fn duration_ms ( & self ) -> u64 {
204
- 100
207
+ fn duration ( & self ) -> tokio :: time :: Duration {
208
+ tokio :: time :: Duration :: from_millis ( 100 )
205
209
}
206
210
207
211
fn summary ( & self ) -> String {
@@ -561,22 +565,20 @@ impl SimNet {
561
565
// Get latency
562
566
event. read_simnet_config ( & self . config ) . await ;
563
567
ScheduledEvent {
564
- time : SimClock . millis_since_start (
565
- SimClock . now ( ) + tokio:: time:: Duration :: from_millis ( event. duration_ms ( ) ) ,
566
- ) ,
568
+ time : SimClock . now ( ) + event. duration ( ) ,
567
569
event,
568
570
}
569
571
}
570
572
571
573
/// Schedule the event into the network.
572
574
fn schedule_event ( & mut self , scheduled_event : ScheduledEvent , advanceable : bool ) {
573
- let start_at = SimClock . millis_since_start ( SimClock . now ( ) ) ;
575
+ let start_at = SimClock . now ( ) ;
574
576
let end_at = scheduled_event. time ;
575
577
576
578
self . records . push ( SimulatorEventRecord {
577
579
summary : scheduled_event. event . summary ( ) ,
578
- start_at,
579
- end_at,
580
+ start_at : SimClock . duration_since_start ( start_at ) . as_millis ( ) as u64 ,
581
+ end_at : SimClock . duration_since_start ( end_at ) . as_millis ( ) as u64 ,
580
582
} ) ;
581
583
582
584
if advanceable {
@@ -604,7 +606,7 @@ impl SimNet {
604
606
) -> Vec < SimulatorEventRecord > {
605
607
// The simulated number of milliseconds the training script
606
608
// has spent waiting for the backend to resolve a future
607
- let mut training_script_waiting_time: u64 = 0 ;
609
+ let mut training_script_waiting_time = tokio :: time :: Duration :: from_millis ( 0 ) ;
608
610
// Duration elapsed while only non_advanceable_events has events
609
611
let mut debounce_timer: Option < tokio:: time:: Instant > = None ;
610
612
' outer: loop {
@@ -638,9 +640,7 @@ impl SimNet {
638
640
. scheduled_events
639
641
. first_key_value ( )
640
642
. is_some_and ( |( time, _) | {
641
- * time
642
- > SimClock . millis_since_start ( RealClock . now ( ) )
643
- + training_script_waiting_time
643
+ * time > RealClock . now ( ) + training_script_waiting_time
644
644
} )
645
645
{
646
646
tokio:: task:: yield_now ( ) . await ;
@@ -705,8 +705,7 @@ impl SimNet {
705
705
continue ;
706
706
} ;
707
707
if training_script_state_rx. borrow ( ) . is_waiting ( ) {
708
- let advanced_time =
709
- scheduled_time - SimClock . millis_since_start ( SimClock . now ( ) ) ;
708
+ let advanced_time = scheduled_time - SimClock . now ( ) ;
710
709
training_script_waiting_time += advanced_time;
711
710
}
712
711
SimClock . advance_to ( scheduled_time) ;
@@ -749,9 +748,9 @@ pub struct SimulatorEventRecord {
749
748
/// Event dependent summary for user
750
749
pub summary : String ,
751
750
/// The time at which the message delivery was started.
752
- pub start_at : SimulatorTimeInstant ,
751
+ pub start_at : u64 ,
753
752
/// The time at which the message was delivered to the receiver.
754
- pub end_at : SimulatorTimeInstant ,
753
+ pub end_at : u64 ,
755
754
}
756
755
757
756
/// A configuration for the network topology.
@@ -805,7 +804,7 @@ mod tests {
805
804
src_addr : SimAddr ,
806
805
dest_addr : SimAddr ,
807
806
data : Serialized ,
808
- duration_ms : u64 ,
807
+ duration : tokio :: time :: Duration ,
809
808
dispatcher : Option < TestDispatcher > ,
810
809
}
811
810
@@ -823,8 +822,8 @@ mod tests {
823
822
}
824
823
Ok ( ( ) )
825
824
}
826
- fn duration_ms ( & self ) -> u64 {
827
- self . duration_ms
825
+ fn duration ( & self ) -> tokio :: time :: Duration {
826
+ self . duration
828
827
}
829
828
830
829
fn summary ( & self ) -> String {
@@ -840,12 +839,12 @@ mod tests {
840
839
src : self . src_addr . addr ( ) . clone ( ) ,
841
840
dst : self . dest_addr . addr ( ) . clone ( ) ,
842
841
} ;
843
- self . duration_ms = config
842
+ self . duration = config
844
843
. lock ( )
845
844
. await
846
845
. topology
847
846
. get ( & edge)
848
- . map_or_else ( || 1 , |v| v. latency . as_millis ( ) as u64 ) ;
847
+ . map_or_else ( || tokio :: time :: Duration :: from_millis ( 1 ) , |v| v. latency ) ;
849
848
}
850
849
}
851
850
@@ -860,7 +859,7 @@ mod tests {
860
859
src_addr,
861
860
dest_addr,
862
861
data,
863
- duration_ms : 1 ,
862
+ duration : tokio :: time :: Duration :: from_millis ( 1 ) ,
864
863
dispatcher,
865
864
}
866
865
}
@@ -1132,12 +1131,18 @@ mod tests {
1132
1131
start ( ) ;
1133
1132
1134
1133
let start = SimClock . now ( ) ;
1135
- assert_eq ! ( SimClock . millis_since_start( start) , 0 ) ;
1134
+ assert_eq ! (
1135
+ SimClock . duration_since_start( start) ,
1136
+ tokio:: time:: Duration :: ZERO
1137
+ ) ;
1136
1138
1137
1139
SimClock . sleep ( tokio:: time:: Duration :: from_secs ( 10 ) ) . await ;
1138
1140
1139
1141
let end = SimClock . now ( ) ;
1140
- assert_eq ! ( SimClock . millis_since_start( end) , 10000 ) ;
1142
+ assert_eq ! (
1143
+ SimClock . duration_since_start( end) ,
1144
+ tokio:: time:: Duration :: from_secs( 10 )
1145
+ ) ;
1141
1146
}
1142
1147
1143
1148
#[ tokio:: test]
0 commit comments