@@ -12,25 +12,17 @@ use std::error::Error;
12
12
use std:: fmt;
13
13
use std:: sync:: LazyLock ;
14
14
use std:: sync:: Mutex ;
15
- use std:: sync:: OnceLock ;
16
15
use std:: time:: SystemTime ;
17
16
17
+ use async_trait:: async_trait;
18
18
use futures:: pin_mut;
19
19
use hyperactor_telemetry:: TelemetryClock ;
20
20
use serde:: Deserialize ;
21
21
use serde:: Serialize ;
22
22
23
- use crate :: Mailbox ;
24
23
use crate :: channel:: ChannelAddr ;
25
- use crate :: data:: Named ;
26
- use crate :: id;
27
- use crate :: mailbox:: DeliveryError ;
28
- use crate :: mailbox:: MailboxSender ;
29
- use crate :: mailbox:: MessageEnvelope ;
30
- use crate :: mailbox:: Undeliverable ;
31
- use crate :: mailbox:: UndeliverableMailboxSender ;
32
- use crate :: mailbox:: monitored_return_handle;
33
- use crate :: simnet:: SleepEvent ;
24
+ use crate :: simnet:: Event ;
25
+ use crate :: simnet:: SimNetError ;
34
26
use crate :: simnet:: simnet_handle;
35
27
36
28
struct SimTime {
@@ -183,6 +175,39 @@ impl ClockKind {
183
175
}
184
176
}
185
177
178
+ #[ derive( Debug ) ]
179
+ struct SleepEvent {
180
+ done_tx : Option < tokio:: sync:: oneshot:: Sender < ( ) > > ,
181
+ duration_ms : u64 ,
182
+ }
183
+
184
+ impl SleepEvent {
185
+ pub ( crate ) fn new ( done_tx : tokio:: sync:: oneshot:: Sender < ( ) > , duration_ms : u64 ) -> Box < Self > {
186
+ Box :: new ( Self {
187
+ done_tx : Some ( done_tx) ,
188
+ duration_ms,
189
+ } )
190
+ }
191
+ }
192
+
193
+ #[ async_trait]
194
+ impl Event for SleepEvent {
195
+ async fn handle ( & mut self ) -> Result < ( ) , SimNetError > {
196
+ if self . done_tx . take ( ) . unwrap ( ) . send ( ( ) ) . is_err ( ) {
197
+ tracing:: error!( "Failed to send wakeup event" ) ;
198
+ }
199
+ Ok ( ( ) )
200
+ }
201
+
202
+ fn duration_ms ( & self ) -> u64 {
203
+ self . duration_ms
204
+ }
205
+
206
+ fn summary ( & self ) -> String {
207
+ format ! ( "Sleeping for {} ms" , self . duration_ms)
208
+ }
209
+ }
210
+
186
211
/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
187
212
/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
188
213
/// time to the wakeup time and use the transmitter to wake up this green thread
@@ -192,33 +217,25 @@ pub struct SimClock;
192
217
impl Clock for SimClock {
193
218
/// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
194
219
async fn sleep ( & self , duration : tokio:: time:: Duration ) {
195
- let mailbox = SimClock :: mailbox ( ) . clone ( ) ;
196
- let ( tx, rx) = mailbox. open_once_port :: < ( ) > ( ) ;
220
+ let ( tx, rx) = tokio:: sync:: oneshot:: channel :: < ( ) > ( ) ;
197
221
198
222
simnet_handle ( )
199
223
. unwrap ( )
200
- . send_event ( SleepEvent :: new (
201
- tx. bind ( ) ,
202
- mailbox,
203
- duration. as_millis ( ) as u64 ,
204
- ) )
224
+ . send_event ( SleepEvent :: new ( tx, duration. as_millis ( ) as u64 ) )
205
225
. unwrap ( ) ;
206
- rx. recv ( ) . await . unwrap ( ) ;
226
+
227
+ rx. await . unwrap ( ) ;
207
228
}
208
229
209
230
async fn non_advancing_sleep ( & self , duration : tokio:: time:: Duration ) {
210
- let mailbox = SimClock :: mailbox ( ) . clone ( ) ;
211
- let ( tx, rx) = mailbox. open_once_port :: < ( ) > ( ) ;
231
+ let ( tx, rx) = tokio:: sync:: oneshot:: channel :: < ( ) > ( ) ;
212
232
213
233
simnet_handle ( )
214
234
. unwrap ( )
215
- . send_nonadvanceable_event ( SleepEvent :: new (
216
- tx. bind ( ) ,
217
- mailbox,
218
- duration. as_millis ( ) as u64 ,
219
- ) )
235
+ . send_nonadvanceable_event ( SleepEvent :: new ( tx, duration. as_millis ( ) as u64 ) )
220
236
. unwrap ( ) ;
221
- rx. recv ( ) . await . unwrap ( ) ;
237
+
238
+ rx. await . unwrap ( ) ;
222
239
}
223
240
224
241
async fn sleep_until ( & self , deadline : tokio:: time:: Instant ) {
@@ -242,23 +259,18 @@ impl Clock for SimClock {
242
259
where
243
260
F : std:: future:: Future < Output = T > ,
244
261
{
245
- let mailbox = SimClock :: mailbox ( ) . clone ( ) ;
246
- let ( tx, deadline_rx) = mailbox. open_once_port :: < ( ) > ( ) ;
262
+ let ( tx, deadline_rx) = tokio:: sync:: oneshot:: channel :: < ( ) > ( ) ;
247
263
248
264
simnet_handle ( )
249
265
. unwrap ( )
250
- . send_event ( SleepEvent :: new (
251
- tx. bind ( ) ,
252
- mailbox,
253
- duration. as_millis ( ) as u64 ,
254
- ) )
266
+ . send_event ( SleepEvent :: new ( tx, duration. as_millis ( ) as u64 ) )
255
267
. unwrap ( ) ;
256
268
257
269
let fut = f;
258
270
pin_mut ! ( fut) ;
259
271
260
272
tokio:: select! {
261
- _ = deadline_rx. recv ( ) => {
273
+ _ = deadline_rx => {
262
274
Err ( TimeoutError )
263
275
}
264
276
res = & mut fut => Ok ( res)
@@ -267,28 +279,6 @@ impl Clock for SimClock {
267
279
}
268
280
269
281
impl SimClock {
270
- // TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
271
- // from upstack and handle undeliverable messages properly.
272
- fn mailbox ( ) -> & ' static Mailbox {
273
- static SIMCLOCK_MAILBOX : OnceLock < Mailbox > = OnceLock :: new ( ) ;
274
- SIMCLOCK_MAILBOX . get_or_init ( || {
275
- let mailbox = Mailbox :: new_detached ( id ! ( proc[ 0 ] . proc) . clone ( ) ) ;
276
- let ( undeliverable_messages, mut rx) =
277
- mailbox. open_port :: < Undeliverable < MessageEnvelope > > ( ) ;
278
- undeliverable_messages. bind_to ( Undeliverable :: < MessageEnvelope > :: port ( ) ) ;
279
- tokio:: spawn ( async move {
280
- while let Ok ( Undeliverable ( mut envelope) ) = rx. recv ( ) . await {
281
- envelope. try_set_error ( DeliveryError :: BrokenLink (
282
- "message returned to undeliverable port" . to_string ( ) ,
283
- ) ) ;
284
- UndeliverableMailboxSender
285
- . post ( envelope, /*unused */ monitored_return_handle ( ) )
286
- }
287
- } ) ;
288
- mailbox
289
- } )
290
- }
291
-
292
282
/// Advance the sumulator's time to the specified instant
293
283
pub fn advance_to ( & self , millis : u64 ) {
294
284
let mut guard = SIM_TIME . now . lock ( ) . unwrap ( ) ;
0 commit comments