10
10
// SimRx contains a way to receive messages.
11
11
12
12
//! Local simulated channel implementation.
13
+ use std:: any:: Any ;
13
14
// send leads to add to network.
14
15
use std:: marker:: PhantomData ;
15
16
use std:: sync:: Arc ;
16
17
17
18
use dashmap:: DashMap ;
18
19
use regex:: Regex ;
19
- use tokio:: sync:: Mutex ;
20
20
21
21
use super :: * ;
22
22
use crate :: channel;
@@ -25,12 +25,9 @@ use crate::clock::RealClock;
25
25
use crate :: clock:: SimClock ;
26
26
use crate :: data:: Serialized ;
27
27
use crate :: mailbox:: MessageEnvelope ;
28
- use crate :: simnet;
29
28
use crate :: simnet:: Dispatcher ;
30
29
use crate :: simnet:: Event ;
31
30
use crate :: simnet:: ScheduledEvent ;
32
- use crate :: simnet:: SimNetConfig ;
33
- use crate :: simnet:: SimNetEdge ;
34
31
use crate :: simnet:: SimNetError ;
35
32
use crate :: simnet:: simnet_handle;
36
33
@@ -126,20 +123,18 @@ impl fmt::Display for SimAddr {
126
123
/// Message Event that can be passed around in the simnet.
127
124
#[ derive( Debug ) ]
128
125
pub ( crate ) struct MessageDeliveryEvent {
129
- src_addr : Option < ChannelAddr > ,
130
126
dest_addr : ChannelAddr ,
131
127
data : Serialized ,
132
- duration_ms : u64 ,
128
+ latency : u64 ,
133
129
}
134
130
135
131
impl MessageDeliveryEvent {
136
132
/// Creates a new MessageDeliveryEvent.
137
- pub fn new ( src_addr : Option < ChannelAddr > , dest_addr : ChannelAddr , data : Serialized ) -> Self {
133
+ pub fn new ( dest_addr : ChannelAddr , data : Serialized , latency : u64 ) -> Self {
138
134
Self {
139
- src_addr,
140
135
dest_addr,
141
136
data,
142
- duration_ms : 100 ,
137
+ latency ,
143
138
}
144
139
}
145
140
}
@@ -149,42 +144,17 @@ impl Event for MessageDeliveryEvent {
149
144
async fn handle ( & mut self ) -> Result < ( ) , SimNetError > {
150
145
// Send the message to the correct receiver.
151
146
SENDER
152
- . send (
153
- self . src_addr . clone ( ) ,
154
- self . dest_addr . clone ( ) ,
155
- self . data . clone ( ) ,
156
- )
147
+ . send ( self . dest_addr . clone ( ) , self . data . clone ( ) )
157
148
. await ?;
158
149
Ok ( ( ) )
159
150
}
160
151
161
152
fn duration_ms ( & self ) -> u64 {
162
- self . duration_ms
153
+ self . latency
163
154
}
164
155
165
156
fn summary ( & self ) -> String {
166
- format ! (
167
- "Sending message from {} to {}" ,
168
- self . src_addr
169
- . as_ref( )
170
- . map_or( "unknown" . to_string( ) , |addr| addr. to_string( ) ) ,
171
- self . dest_addr. clone( )
172
- )
173
- }
174
-
175
- async fn read_simnet_config ( & mut self , topology : & Arc < Mutex < SimNetConfig > > ) {
176
- if let Some ( src_addr) = & self . src_addr {
177
- let edge = SimNetEdge {
178
- src : src_addr. clone ( ) ,
179
- dst : self . dest_addr . clone ( ) ,
180
- } ;
181
- self . duration_ms = topology
182
- . lock ( )
183
- . await
184
- . topology
185
- . get ( & edge)
186
- . map_or_else ( || 1 , |v| v. latency . as_millis ( ) as u64 ) ;
187
- }
157
+ format ! ( "Sending message to {}" , self . dest_addr. clone( ) )
188
158
}
189
159
}
190
160
@@ -194,12 +164,6 @@ pub async fn bind(addr: ChannelAddr) -> anyhow::Result<(), SimNetError> {
194
164
simnet_handle ( ) ?. bind ( addr)
195
165
}
196
166
197
- /// Update the configuration for simnet.
198
- pub async fn update_config ( config : simnet:: NetworkConfig ) -> anyhow:: Result < ( ) , SimNetError > {
199
- // Only update network config for now, will add host config in the future.
200
- simnet_handle ( ) ?. update_network_config ( config) . await
201
- }
202
-
203
167
/// Returns a simulated channel address that is bound to "any" channel address.
204
168
pub ( crate ) fn any ( transport : ChannelTransport ) -> ChannelAddr {
205
169
ChannelAddr :: Sim ( SimAddr {
@@ -274,12 +238,7 @@ fn create_egress_sender(
274
238
275
239
#[ async_trait]
276
240
impl Dispatcher < ChannelAddr > for SimDispatcher {
277
- async fn send (
278
- & self ,
279
- _src_addr : Option < ChannelAddr > ,
280
- addr : ChannelAddr ,
281
- data : Serialized ,
282
- ) -> Result < ( ) , SimNetError > {
241
+ async fn send ( & self , addr : ChannelAddr , data : Serialized ) -> Result < ( ) , SimNetError > {
283
242
self . dispatchers
284
243
. get ( & addr)
285
244
. ok_or_else ( || {
@@ -318,27 +277,34 @@ pub(crate) struct SimRx<M: RemoteMessage> {
318
277
}
319
278
320
279
#[ async_trait]
321
- impl < M : RemoteMessage > Tx < M > for SimTx < M > {
280
+ impl < M : RemoteMessage + Any > Tx < M > for SimTx < M > {
322
281
fn try_post ( & self , message : M , _return_handle : oneshot:: Sender < M > ) -> Result < ( ) , SendError < M > > {
323
282
let data = match Serialized :: serialize ( & message) {
324
283
Ok ( data) => data,
325
284
Err ( err) => return Err ( SendError ( err. into ( ) , message) ) ,
326
285
} ;
286
+
287
+ let envelope = ( & message as & dyn Any )
288
+ . downcast_ref :: < MessageEnvelope > ( )
289
+ . expect ( "RemoteMessage should always be a MessageEnvelope" ) ;
290
+
291
+ let ( sender, dest) = ( envelope. sender ( ) . clone ( ) , envelope. dest ( ) . 0 . clone ( ) ) ;
292
+
327
293
match simnet_handle ( ) {
328
- Ok ( handle) => match & self . src_addr {
329
- Some ( _) if self . client => handle. send_scheduled_event ( ScheduledEvent {
330
- event : Box :: new ( MessageDeliveryEvent :: new (
331
- self . src_addr . clone ( ) ,
332
- self . dst_addr . clone ( ) ,
333
- data,
334
- ) ) ,
335
- time : SimClock . millis_since_start ( RealClock . now ( ) ) ,
336
- } ) ,
337
- _ => handle. send_event ( Box :: new ( MessageDeliveryEvent :: new (
338
- self . src_addr . clone ( ) ,
294
+ Ok ( handle) => {
295
+ let event = Box :: new ( MessageDeliveryEvent :: new (
339
296
self . dst_addr . clone ( ) ,
340
297
data,
341
- ) ) ) ,
298
+ handle. sample_latency ( sender. proc_id ( ) , dest. proc_id ( ) ) ,
299
+ ) ) ;
300
+
301
+ match & self . src_addr {
302
+ Some ( _) if self . client => handle. send_scheduled_event ( ScheduledEvent {
303
+ event,
304
+ time : SimClock . millis_since_start ( RealClock . now ( ) ) ,
305
+ } ) ,
306
+ _ => handle. send_event ( event) ,
307
+ }
342
308
}
343
309
. map_err ( |err : SimNetError | SendError ( ChannelError :: from ( err) , message) ) ,
344
310
Err ( err) => Err ( SendError ( ChannelError :: from ( err) , message) ) ,
@@ -410,19 +376,27 @@ impl<M: RemoteMessage> Rx<M> for SimRx<M> {
410
376
mod tests {
411
377
use std:: iter:: zip;
412
378
379
+ use ndslice:: extent;
380
+
413
381
use super :: * ;
382
+ use crate :: PortId ;
383
+ use crate :: attrs:: Attrs ;
414
384
use crate :: clock:: Clock ;
415
385
use crate :: clock:: RealClock ;
416
386
use crate :: clock:: SimClock ;
417
- use crate :: simnet:: NetworkConfig ;
387
+ use crate :: id;
388
+ use crate :: simnet;
389
+ use crate :: simnet:: LatencyConfig ;
418
390
use crate :: simnet:: start;
391
+ use crate :: simnet:: start_with_config;
419
392
420
393
#[ tokio:: test]
421
394
async fn test_sim_basic ( ) {
422
395
let dst_ok = vec ! [ "tcp:[::1]:1234" , "tcp:127.0.0.1:8080" , "local:123" ] ;
423
396
let srcs_ok = vec ! [ "tcp:[::2]:1234" , "tcp:127.0.0.2:8080" , "local:124" ] ;
424
397
425
398
start ( ) ;
399
+ let handle = simnet_handle ( ) . unwrap ( ) ;
426
400
427
401
// TODO: New NodeAdd event should do this for you..
428
402
for addr in dst_ok. iter ( ) . chain ( srcs_ok. iter ( ) ) {
@@ -439,10 +413,24 @@ mod tests {
439
413
)
440
414
. unwrap ( ) ;
441
415
442
- let ( _, mut rx) = sim:: serve :: < u64 > ( dst_addr. clone ( ) ) . unwrap ( ) ;
443
- let tx = sim:: dial :: < u64 > ( dst_addr) . unwrap ( ) ;
444
- tx. try_post ( 123 , oneshot:: channel ( ) . 0 ) . unwrap ( ) ;
445
- assert_eq ! ( rx. recv( ) . await . unwrap( ) , 123 ) ;
416
+ let ( _, mut rx) = sim:: serve :: < MessageEnvelope > ( dst_addr. clone ( ) ) . unwrap ( ) ;
417
+ let tx = sim:: dial :: < MessageEnvelope > ( dst_addr) . unwrap ( ) ;
418
+ let data = Serialized :: serialize ( & 456 ) . unwrap ( ) ;
419
+ let sender = id ! ( world[ 0 ] . hello) ;
420
+ let dest = id ! ( world[ 1 ] . hello) ;
421
+ let ext = extent ! ( region = 1 , dc = 1 , rack = 4 , host = 4 , gpu = 8 ) ;
422
+ handle. register_proc (
423
+ sender. proc_id ( ) . clone ( ) ,
424
+ ext. point ( vec ! [ 0 , 0 , 0 , 0 , 0 ] ) . unwrap ( ) ,
425
+ ) ;
426
+ handle. register_proc (
427
+ dest. proc_id ( ) . clone ( ) ,
428
+ ext. point ( vec ! [ 0 , 0 , 0 , 1 , 0 ] ) . unwrap ( ) ,
429
+ ) ;
430
+
431
+ let msg = MessageEnvelope :: new ( sender, PortId ( dest, 0 ) , data. clone ( ) , Attrs :: new ( ) ) ;
432
+ tx. try_post ( msg, oneshot:: channel ( ) . 0 ) . unwrap ( ) ;
433
+ assert_eq ! ( * rx. recv( ) . await . unwrap( ) . data( ) , data) ;
446
434
}
447
435
448
436
let records = sim:: simnet_handle ( ) . unwrap ( ) . close ( ) . await . unwrap ( ) ;
@@ -481,30 +469,47 @@ mod tests {
481
469
482
470
#[ tokio:: test]
483
471
async fn test_realtime_frontier ( ) {
484
- start ( ) ;
485
-
486
472
tokio:: time:: pause ( ) ;
473
+ // 1 second of latency
474
+ start_with_config ( LatencyConfig {
475
+ inter_host : ( 100 , 100 ) ,
476
+ ..Default :: default ( )
477
+ } ) ;
478
+
487
479
let sim_addr = SimAddr :: new ( "unix:@dst" . parse :: < ChannelAddr > ( ) . unwrap ( ) ) . unwrap ( ) ;
488
480
let sim_addr_with_src = SimAddr :: new_with_src (
489
481
"unix:@src" . parse :: < ChannelAddr > ( ) . unwrap ( ) ,
490
482
"unix:@dst" . parse :: < ChannelAddr > ( ) . unwrap ( ) ,
491
483
)
492
484
. unwrap ( ) ;
493
- let ( _, mut rx) = sim:: serve :: < ( ) > ( sim_addr. clone ( ) ) . unwrap ( ) ;
494
- let tx = sim:: dial :: < ( ) > ( sim_addr_with_src) . unwrap ( ) ;
495
- let simnet_config_yaml = r#"
496
- edges:
497
- - src: unix:@src
498
- dst: unix:@dst
499
- metadata:
500
- latency: 100
501
- "# ;
502
- update_config ( NetworkConfig :: from_yaml ( simnet_config_yaml) . unwrap ( ) )
503
- . await
504
- . unwrap ( ) ;
485
+ let ( _, mut rx) = sim:: serve :: < MessageEnvelope > ( sim_addr. clone ( ) ) . unwrap ( ) ;
486
+ let tx = sim:: dial :: < MessageEnvelope > ( sim_addr_with_src) . unwrap ( ) ;
487
+
488
+ let controller = id ! ( world[ 0 ] . controller) ;
489
+ let dest = id ! ( world[ 1 ] . dest) ;
490
+ let handle = simnet:: simnet_handle ( ) . unwrap ( ) ;
491
+
492
+ let ext = extent ! ( region = 1 , dc = 1 , zone = 1 , rack = 4 , host = 4 , gpu = 8 ) ;
493
+ handle. register_proc (
494
+ controller. proc_id ( ) . clone ( ) ,
495
+ ext. point ( vec ! [ 0 , 0 , 0 , 0 , 0 , 0 ] ) . unwrap ( ) ,
496
+ ) ;
497
+ handle. register_proc (
498
+ dest. proc_id ( ) . clone ( ) ,
499
+ ext. point ( vec ! [ 0 , 0 , 0 , 0 , 1 , 0 ] ) . unwrap ( ) ,
500
+ ) ;
505
501
506
502
// This message will be delievered at simulator time = 100 seconds
507
- tx. try_post ( ( ) , oneshot:: channel ( ) . 0 ) . unwrap ( ) ;
503
+ tx. try_post (
504
+ MessageEnvelope :: new (
505
+ controller,
506
+ PortId ( dest, 0 ) ,
507
+ Serialized :: serialize ( & 456 ) . unwrap ( ) ,
508
+ Attrs :: new ( ) ,
509
+ ) ,
510
+ oneshot:: channel ( ) . 0 ,
511
+ )
512
+ . unwrap ( ) ;
508
513
{
509
514
// Allow simnet to run
510
515
tokio:: task:: yield_now ( ) . await ;
@@ -524,41 +529,74 @@ mod tests {
524
529
#[ tokio:: test]
525
530
async fn test_client_message_scheduled_realtime ( ) {
526
531
tokio:: time:: pause ( ) ;
527
- start ( ) ;
532
+ // 1 second of latency
533
+ start_with_config ( LatencyConfig {
534
+ inter_host : ( 1000 , 1000 ) ,
535
+ ..Default :: default ( )
536
+ } ) ;
537
+
528
538
let controller_to_dst = SimAddr :: new_with_src (
529
539
"unix:@controller" . parse :: < ChannelAddr > ( ) . unwrap ( ) ,
530
540
"unix:@dst" . parse :: < ChannelAddr > ( ) . unwrap ( ) ,
531
541
)
532
542
. unwrap ( ) ;
533
- let controller_tx = sim:: dial :: < ( ) > ( controller_to_dst. clone ( ) ) . unwrap ( ) ;
543
+
544
+ let controller_tx = sim:: dial :: < MessageEnvelope > ( controller_to_dst. clone ( ) ) . unwrap ( ) ;
534
545
535
546
let client_to_dst = SimAddr :: new_with_client_src (
536
547
"unix:@client" . parse :: < ChannelAddr > ( ) . unwrap ( ) ,
537
548
"unix:@dst" . parse :: < ChannelAddr > ( ) . unwrap ( ) ,
538
549
)
539
550
. unwrap ( ) ;
540
- let client_tx = sim:: dial :: < ( ) > ( client_to_dst) . unwrap ( ) ;
541
-
542
- // 1 second of latency
543
- let simnet_config_yaml = r#"
544
- edges:
545
- - src: unix:@controller
546
- dst: unix:@dst
547
- metadata:
548
- latency: 1
549
- "# ;
550
- update_config ( NetworkConfig :: from_yaml ( simnet_config_yaml) . unwrap ( ) )
551
- . await
552
- . unwrap ( ) ;
551
+ let client_tx = sim:: dial :: < MessageEnvelope > ( client_to_dst) . unwrap ( ) ;
552
+
553
+ let controller = id ! ( world[ 0 ] . controller) ;
554
+ let dest = id ! ( world[ 1 ] . dest) ;
555
+ let client = id ! ( world[ 2 ] . client) ;
556
+
557
+ let handle = simnet:: simnet_handle ( ) . unwrap ( ) ;
558
+ let ext = extent ! ( region = 1 , dc = 1 , zone = 1 , rack = 4 , host = 4 , gpu = 8 ) ;
559
+ handle. register_proc (
560
+ controller. proc_id ( ) . clone ( ) ,
561
+ ext. point ( vec ! [ 0 , 0 , 0 , 0 , 0 , 0 ] ) . unwrap ( ) ,
562
+ ) ;
563
+ handle. register_proc (
564
+ client. proc_id ( ) . clone ( ) ,
565
+ ext. point ( vec ! [ 0 , 0 , 0 , 0 , 0 , 0 ] ) . unwrap ( ) ,
566
+ ) ;
567
+ handle. register_proc (
568
+ dest. proc_id ( ) . clone ( ) ,
569
+ ext. point ( vec ! [ 0 , 0 , 0 , 0 , 1 , 0 ] ) . unwrap ( ) ,
570
+ ) ;
553
571
554
572
assert_eq ! ( SimClock . millis_since_start( RealClock . now( ) ) , 0 ) ;
555
573
// Fast forward real time to 5 seconds
556
574
tokio:: time:: advance ( tokio:: time:: Duration :: from_secs ( 5 ) ) . await ;
557
575
{
558
576
// Send client message
559
- client_tx. try_post ( ( ) , oneshot:: channel ( ) . 0 ) . unwrap ( ) ;
577
+ client_tx
578
+ . try_post (
579
+ MessageEnvelope :: new (
580
+ client. clone ( ) ,
581
+ PortId ( dest. clone ( ) , 0 ) ,
582
+ Serialized :: serialize ( & 456 ) . unwrap ( ) ,
583
+ Attrs :: new ( ) ,
584
+ ) ,
585
+ oneshot:: channel ( ) . 0 ,
586
+ )
587
+ . unwrap ( ) ;
560
588
// Send system message
561
- controller_tx. try_post ( ( ) , oneshot:: channel ( ) . 0 ) . unwrap ( ) ;
589
+ controller_tx
590
+ . try_post (
591
+ MessageEnvelope :: new (
592
+ controller. clone ( ) ,
593
+ PortId ( dest. clone ( ) , 0 ) ,
594
+ Serialized :: serialize ( & 456 ) . unwrap ( ) ,
595
+ Attrs :: new ( ) ,
596
+ ) ,
597
+ oneshot:: channel ( ) . 0 ,
598
+ )
599
+ . unwrap ( ) ;
562
600
// Allow some time for simnet to run
563
601
RealClock . sleep ( tokio:: time:: Duration :: from_secs ( 1 ) ) . await ;
564
602
}
0 commit comments