@@ -47,6 +47,9 @@ extern "C" fn rust_main() {
47
47
tester. run ( Command :: SemWaitAsync ( 10_000 ) ) ;
48
48
tester. run ( Command :: SemWaitSameAsync ( 10_000 ) ) ;
49
49
tester. run ( Command :: SemHigh ( 10_000 ) ) ;
50
+ tester. run ( Command :: SemPingPong ( 10_000 ) ) ;
51
+ tester. run ( Command :: SemPingPongAsync ( 10_000 ) ) ;
52
+ tester. run ( Command :: Empty ) ;
50
53
51
54
printkln ! ( "Done with all tests\n " ) ;
52
55
tester. leak ( ) ;
@@ -63,6 +66,9 @@ struct ThreadTests {
63
66
/// Each test thread gets a semaphore, to use as appropriate for that test.
64
67
sems : Vec < Arc < Semaphore > > ,
65
68
69
+ /// This semaphore is used to ping-ping back to another thread.
70
+ back_sems : Vec < Arc < Semaphore > > ,
71
+
66
72
/// Each test also has a message queue, for testing, that it has sender and receiver for.
67
73
chans : Vec < ChanPair < u32 > > ,
68
74
@@ -100,6 +106,7 @@ impl ThreadTests {
100
106
101
107
let mut result = Self {
102
108
sems : Vec :: new ( ) ,
109
+ back_sems : Vec :: new ( ) ,
103
110
chans : Vec :: new ( ) ,
104
111
commands : Vec :: new ( ) ,
105
112
results : ChanPair :: new_unbounded ( ) ,
@@ -114,6 +121,9 @@ impl ThreadTests {
114
121
let sem = Arc :: new ( Semaphore :: new ( 0 , u32:: MAX ) . unwrap ( ) ) ;
115
122
result. sems . push ( sem. clone ( ) ) ;
116
123
124
+ let sem = Arc :: new ( Semaphore :: new ( 0 , u32:: MAX ) . unwrap ( ) ) ;
125
+ result. back_sems . push ( sem) ;
126
+
117
127
let chans = ChanPair :: new_bounded ( 1 ) ;
118
128
result. chans . push ( chans. clone ( ) ) ;
119
129
@@ -167,6 +177,22 @@ impl ThreadTests {
167
177
168
178
fn run ( & self , command : Command ) {
169
179
// printkln!("Running {:?}", command);
180
+
181
+ // In case previous runs left the semaphore non-zero, reset all of them. This is safe due
182
+ // to nothing using the semaphores right now.
183
+ for sem in & self . sems {
184
+ if sem. count_get ( ) > 0 {
185
+ printkln ! ( "Warning: previous test left count: {}" , sem. count_get( ) ) ;
186
+ sem. reset ( ) ;
187
+ }
188
+ }
189
+ for sem in & self . back_sems {
190
+ if sem. count_get ( ) > 0 {
191
+ printkln ! ( "Warning: previous test left count: {}" , sem. count_get( ) ) ;
192
+ sem. reset ( ) ;
193
+ }
194
+ }
195
+
170
196
let start = now ( ) ;
171
197
172
198
let mut results = vec ! [ None ; self . chans. len( ) ] ;
@@ -257,6 +283,10 @@ impl ThreadTests {
257
283
this. sem_take ( & this. sems [ id] , count, & mut total) ;
258
284
}
259
285
286
+ Command :: SemPingPong ( count) => {
287
+ this. ping_pong_worker ( id, & this. sems [ id] , & this. back_sems [ id] , count, & mut total) ;
288
+ }
289
+
260
290
// For the async commands, spawn this on the worker thread and don't reply
261
291
// ourselves.
262
292
Command :: SimpleSemAsync ( count) => {
@@ -297,11 +327,39 @@ impl ThreadTests {
297
327
& this. workq ,
298
328
c"worker" ,
299
329
) ;
330
+ if id == 0 {
331
+ spawn (
332
+ Self :: sem_giver_async ( this. clone ( ) , this. sems . clone ( ) , count) ,
333
+ & this. workq ,
334
+ c"giver" ,
335
+ ) ;
336
+ }
337
+ continue ;
338
+ }
339
+
340
+ Command :: SemPingPongAsync ( count) => {
300
341
spawn (
301
- Self :: sem_giver_async ( this. clone ( ) , this. sems . clone ( ) , count) ,
342
+ Self :: ping_pong_worker_async (
343
+ this. clone ( ) ,
344
+ id,
345
+ this. sems [ id] . clone ( ) ,
346
+ this. back_sems [ id] . clone ( ) ,
347
+ count,
348
+ ) ,
302
349
& this. workq ,
303
- c"giver " ,
350
+ c"worker " ,
304
351
) ;
352
+ if id == 0 {
353
+ spawn (
354
+ Self :: ping_pong_replier_async (
355
+ this. clone ( ) ,
356
+ count,
357
+ ) ,
358
+ & this. workq ,
359
+ c"giver" ,
360
+ ) ;
361
+ }
362
+
305
363
continue ;
306
364
}
307
365
}
@@ -370,7 +428,51 @@ impl ThreadTests {
370
428
371
429
async fn sem_take_async ( this : Arc < Self > , id : usize , sem : Arc < Semaphore > , count : usize ) {
372
430
for _ in 0 ..count {
431
+ // Enable this to verify that we are actually blocking.
432
+ if false {
433
+ if let Ok ( _) = sem. take ( NoWait ) {
434
+ panic ! ( "Semaphore was already available" ) ;
435
+ }
436
+ }
437
+ sem. take_async ( Forever ) . await . unwrap ( ) ;
438
+ }
439
+
440
+ this. results
441
+ . sender
442
+ . send_async ( Result :: Worker { id, count } )
443
+ . await
444
+ . unwrap ( ) ;
445
+ }
446
+
447
+ /// Worker side of the ping pong sem, takes the 'sem' and gives to the back_sem.
448
+ fn ping_pong_worker ( & self , id : usize , sem : & Semaphore , back_sem : & Semaphore , count : usize , total : & mut usize ) {
449
+ for i in 0 ..count {
450
+ if false {
451
+ if let Ok ( _) = sem. take ( NoWait ) {
452
+ panic ! ( "Semaphore was already available: {} loop:{}" , id, i) ;
453
+ }
454
+ }
455
+ sem. take ( Forever ) . unwrap ( ) ;
456
+ back_sem. give ( ) ;
457
+ * total += 1 ;
458
+ }
459
+ }
460
+
461
+ async fn ping_pong_worker_async (
462
+ this : Arc < Self > ,
463
+ id : usize ,
464
+ sem : Arc < Semaphore > ,
465
+ back_sem : Arc < Semaphore > ,
466
+ count : usize ,
467
+ ) {
468
+ for i in 0 ..count {
469
+ if false {
470
+ if let Ok ( _) = sem. take ( NoWait ) {
471
+ panic ! ( "Semaphore was already available: {} loop:{}" , id, i) ;
472
+ }
473
+ }
373
474
sem. take_async ( Forever ) . await . unwrap ( ) ;
475
+ back_sem. give ( ) ;
374
476
}
375
477
376
478
this. results
@@ -380,12 +482,33 @@ impl ThreadTests {
380
482
. unwrap ( ) ;
381
483
}
382
484
485
+ fn ping_pong_replier ( & self , count : usize ) {
486
+ for _ in 0 ..count {
487
+ for ( sem, back) in self . sems . iter ( ) . zip ( & self . back_sems ) {
488
+ sem. give ( ) ;
489
+ back. take ( Forever ) . unwrap ( ) ;
490
+ }
491
+ }
492
+ }
493
+
494
+ async fn ping_pong_replier_async ( this : Arc < Self > , count : usize ) {
495
+ for _ in 0 ..count {
496
+ for ( sem, back) in this. sems . iter ( ) . zip ( & this. back_sems ) {
497
+ sem. give ( ) ;
498
+ back. take_async ( Forever ) . await . unwrap ( ) ;
499
+ }
500
+ }
501
+
502
+ // No reply.
503
+ }
504
+
383
505
async fn sem_giver_async ( this : Arc < Self > , sems : Vec < Arc < Semaphore > > , count : usize ) {
384
506
for _ in 0 ..count {
385
507
for sem in & sems {
386
508
sem. give ( ) ;
387
- // Yield after each, forcing us back into the work queue, to allow the workers to
388
- // run, and block.
509
+
510
+ // Yield after each loop. This should only force a reschedule each task's operation,
511
+ // just enough to make sure everything still blocks.
389
512
yield_now ( ) . await ;
390
513
}
391
514
}
@@ -415,6 +538,10 @@ impl ThreadTests {
415
538
}
416
539
Command :: SemWaitSameAsync ( _) => ( ) ,
417
540
Command :: SemHigh ( _) => ( ) ,
541
+ Command :: SemPingPong ( count) => {
542
+ this. ping_pong_replier ( count) ;
543
+ }
544
+ Command :: SemPingPongAsync ( _) => ( ) ,
418
545
}
419
546
// printkln!("low command: {:?}", cmd);
420
547
@@ -434,6 +561,7 @@ impl ThreadTests {
434
561
Command :: SemWait ( _) => ( ) ,
435
562
Command :: SemWaitAsync ( _) => ( ) ,
436
563
Command :: SemWaitSameAsync ( _) => ( ) ,
564
+ Command :: SemPingPong ( _) => ( ) ,
437
565
Command :: SemHigh ( count) => {
438
566
// The high-priority thread does all of the gives, this should cause every single
439
567
// semaphore operation to be ready.
@@ -443,6 +571,7 @@ impl ThreadTests {
443
571
}
444
572
}
445
573
}
574
+ Command :: SemPingPongAsync ( _) => ( ) ,
446
575
}
447
576
// printkln!("high command: {:?}", cmd);
448
577
@@ -492,6 +621,10 @@ enum Command {
492
621
/// Semaphore tests where the high priority thread does the 'give', so every wait should be
493
622
/// read.
494
623
SemHigh ( usize ) ,
624
+ /// Semaphores ping-ponging between worker threads and a low priority thread.
625
+ SemPingPong ( usize ) ,
626
+ /// SemPingPong, but async
627
+ SemPingPongAsync ( usize ) ,
495
628
}
496
629
497
630
enum Result {
0 commit comments