@@ -12,25 +12,17 @@ use std::error::Error;
12
12
use std:: fmt;
13
13
use std:: sync:: LazyLock ;
14
14
use std:: sync:: Mutex ;
15
- use std:: sync:: OnceLock ;
16
15
use std:: time:: SystemTime ;
17
16
17
+ use async_trait:: async_trait;
18
18
use futures:: pin_mut;
19
19
use hyperactor_telemetry:: TelemetryClock ;
20
20
use serde:: Deserialize ;
21
21
use serde:: Serialize ;
22
22
23
- use crate :: Mailbox ;
24
23
use crate :: channel:: ChannelAddr ;
25
- use crate :: data:: Named ;
26
- use crate :: id;
27
- use crate :: mailbox:: DeliveryError ;
28
- use crate :: mailbox:: MailboxSender ;
29
- use crate :: mailbox:: MessageEnvelope ;
30
- use crate :: mailbox:: Undeliverable ;
31
- use crate :: mailbox:: UndeliverableMailboxSender ;
32
- use crate :: mailbox:: monitored_return_handle;
33
- use crate :: simnet:: SleepEvent ;
24
+ use crate :: simnet:: Event ;
25
+ use crate :: simnet:: SimNetError ;
34
26
use crate :: simnet:: simnet_handle;
35
27
36
28
struct SimTime {
@@ -183,6 +175,45 @@ impl ClockKind {
183
175
}
184
176
}
185
177
178
+ #[ derive( Debug ) ]
179
+ struct SleepEvent {
180
+ done_tx : Option < tokio:: sync:: oneshot:: Sender < ( ) > > ,
181
+ duration : tokio:: time:: Duration ,
182
+ }
183
+
184
+ impl SleepEvent {
185
+ pub ( crate ) fn new (
186
+ done_tx : tokio:: sync:: oneshot:: Sender < ( ) > ,
187
+ duration : tokio:: time:: Duration ,
188
+ ) -> Box < Self > {
189
+ Box :: new ( Self {
190
+ done_tx : Some ( done_tx) ,
191
+ duration,
192
+ } )
193
+ }
194
+ }
195
+
196
+ #[ async_trait]
197
+ impl Event for SleepEvent {
198
+ async fn handle ( & mut self ) -> Result < ( ) , SimNetError > {
199
+ self . done_tx
200
+ . take ( )
201
+ . unwrap ( )
202
+ . send ( ( ) )
203
+ . map_err ( |_| SimNetError :: PanickedTask ) ?;
204
+
205
+ Ok ( ( ) )
206
+ }
207
+
208
+ fn duration ( & self ) -> tokio:: time:: Duration {
209
+ self . duration
210
+ }
211
+
212
+ fn summary ( & self ) -> String {
213
+ format ! ( "Sleeping for {} ms" , self . duration. as_millis( ) )
214
+ }
215
+ }
216
+
186
217
/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
187
218
/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
188
219
/// time to the wakeup time and use the transmitter to wake up this green thread
@@ -192,25 +223,25 @@ pub struct SimClock;
192
223
impl Clock for SimClock {
193
224
/// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
194
225
async fn sleep ( & self , duration : tokio:: time:: Duration ) {
195
- let mailbox = SimClock :: mailbox ( ) . clone ( ) ;
196
- let ( tx, rx) = mailbox. open_once_port :: < ( ) > ( ) ;
226
+ let ( tx, rx) = tokio:: sync:: oneshot:: channel :: < ( ) > ( ) ;
197
227
198
228
simnet_handle ( )
199
229
. unwrap ( )
200
- . send_event ( SleepEvent :: new ( tx. bind ( ) , mailbox , duration) )
230
+ . send_event ( SleepEvent :: new ( tx, duration) )
201
231
. unwrap ( ) ;
202
- rx. recv ( ) . await . unwrap ( ) ;
232
+
233
+ rx. await . unwrap ( ) ;
203
234
}
204
235
205
236
async fn non_advancing_sleep ( & self , duration : tokio:: time:: Duration ) {
206
- let mailbox = SimClock :: mailbox ( ) . clone ( ) ;
207
- let ( tx, rx) = mailbox. open_once_port :: < ( ) > ( ) ;
237
+ let ( tx, rx) = tokio:: sync:: oneshot:: channel :: < ( ) > ( ) ;
208
238
209
239
simnet_handle ( )
210
240
. unwrap ( )
211
- . send_nonadvanceable_event ( SleepEvent :: new ( tx. bind ( ) , mailbox , duration) )
241
+ . send_nonadvanceable_event ( SleepEvent :: new ( tx, duration) )
212
242
. unwrap ( ) ;
213
- rx. recv ( ) . await . unwrap ( ) ;
243
+
244
+ rx. await . unwrap ( ) ;
214
245
}
215
246
216
247
async fn sleep_until ( & self , deadline : tokio:: time:: Instant ) {
@@ -234,19 +265,18 @@ impl Clock for SimClock {
234
265
where
235
266
F : std:: future:: Future < Output = T > ,
236
267
{
237
- let mailbox = SimClock :: mailbox ( ) . clone ( ) ;
238
- let ( tx, deadline_rx) = mailbox. open_once_port :: < ( ) > ( ) ;
268
+ let ( tx, deadline_rx) = tokio:: sync:: oneshot:: channel :: < ( ) > ( ) ;
239
269
240
270
simnet_handle ( )
241
271
. unwrap ( )
242
- . send_event ( SleepEvent :: new ( tx. bind ( ) , mailbox , duration) )
272
+ . send_event ( SleepEvent :: new ( tx, duration) )
243
273
. unwrap ( ) ;
244
274
245
275
let fut = f;
246
276
pin_mut ! ( fut) ;
247
277
248
278
tokio:: select! {
249
- _ = deadline_rx. recv ( ) => {
279
+ _ = deadline_rx => {
250
280
Err ( TimeoutError )
251
281
}
252
282
res = & mut fut => Ok ( res)
@@ -255,28 +285,6 @@ impl Clock for SimClock {
255
285
}
256
286
257
287
impl SimClock {
258
- // TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
259
- // from upstack and handle undeliverable messages properly.
260
- fn mailbox ( ) -> & ' static Mailbox {
261
- static SIMCLOCK_MAILBOX : OnceLock < Mailbox > = OnceLock :: new ( ) ;
262
- SIMCLOCK_MAILBOX . get_or_init ( || {
263
- let mailbox = Mailbox :: new_detached ( id ! ( proc[ 0 ] . proc) . clone ( ) ) ;
264
- let ( undeliverable_messages, mut rx) =
265
- mailbox. open_port :: < Undeliverable < MessageEnvelope > > ( ) ;
266
- undeliverable_messages. bind_to ( Undeliverable :: < MessageEnvelope > :: port ( ) ) ;
267
- tokio:: spawn ( async move {
268
- while let Ok ( Undeliverable ( mut envelope) ) = rx. recv ( ) . await {
269
- envelope. try_set_error ( DeliveryError :: BrokenLink (
270
- "message returned to undeliverable port" . to_string ( ) ,
271
- ) ) ;
272
- UndeliverableMailboxSender
273
- . post ( envelope, /*unused */ monitored_return_handle ( ) )
274
- }
275
- } ) ;
276
- mailbox
277
- } )
278
- }
279
-
280
288
/// Advance the sumulator's time to the specified instant
281
289
pub fn advance_to ( & self , time : tokio:: time:: Instant ) {
282
290
let mut guard = SIM_TIME . now . lock ( ) . unwrap ( ) ;
0 commit comments