@@ -10,7 +10,7 @@ use async_task::{Builder, Runnable};
10
10
use futures_lite:: { future, prelude:: * } ;
11
11
use slab:: Slab ;
12
12
13
- use crate :: AsyncCallOnDrop ;
13
+ use crate :: { AsyncCallOnDrop , Sleepers } ;
14
14
#[ doc( no_inline) ]
15
15
pub use async_task:: Task ;
16
16
@@ -101,9 +101,7 @@ impl<'a> LocalExecutor<'a> {
101
101
/// ```
102
102
pub fn spawn < T : ' a > ( & self , future : impl Future < Output = T > + ' a ) -> Task < T > {
103
103
let mut active = self . state ( ) . active ( ) ;
104
-
105
- // SAFETY: `T` and the future are `Send`.
106
- unsafe { self . spawn_inner ( future, & mut active) }
104
+ self . spawn_inner ( future, & mut active)
107
105
}
108
106
109
107
/// Spawns many tasks onto the executor.
@@ -151,32 +149,19 @@ impl<'a> LocalExecutor<'a> {
151
149
futures : impl IntoIterator < Item = F > ,
152
150
handles : & mut impl Extend < Task < F :: Output > > ,
153
151
) {
154
- let mut active = Some ( self . state ( ) . active ( ) ) ;
152
+ let mut active = self . state ( ) . active ( ) ;
155
153
156
154
// Convert the futures into tasks.
157
- let tasks = futures. into_iter ( ) . enumerate ( ) . map ( move |( i, future) | {
158
- // SAFETY: `T` and the future are `Send`.
159
- let task = unsafe { self . spawn_inner ( future, active. as_mut ( ) . unwrap ( ) ) } ;
160
-
161
- // Yield the lock every once in a while to ease contention.
162
- if i. wrapping_sub ( 1 ) % 500 == 0 {
163
- drop ( active. take ( ) ) ;
164
- active = Some ( self . state ( ) . active ( ) ) ;
165
- }
166
-
167
- task
155
+ let tasks = futures. into_iter ( ) . map ( move |future| {
156
+ self . spawn_inner ( future, & mut active)
168
157
} ) ;
169
158
170
159
// Push the tasks to the user's collection.
171
160
handles. extend ( tasks) ;
172
161
}
173
162
174
163
/// Spawn a future while holding the inner lock.
175
- ///
176
- /// # Safety
177
- ///
178
- /// If this is an `Executor`, `F` and `T` must be `Send`.
179
- unsafe fn spawn_inner < T : ' a > (
164
+ fn spawn_inner < T : ' a > (
180
165
& self ,
181
166
future : impl Future < Output = T > + ' a ,
182
167
active : & mut Slab < Waker > ,
@@ -191,8 +176,7 @@ impl<'a> LocalExecutor<'a> {
191
176
//
192
177
// SAFETY:
193
178
//
194
- // If `future` is not `Send`, this must be a `LocalExecutor` as per this
195
- // function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
179
+ // `future` may not `Send`. Since `LocalExecutor` is `!Sync`,
196
180
// `try_tick`, `tick` and `run` can only be called from the origin
197
181
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called
198
182
// from the origin thread, ensuring that `future` and the executor share
@@ -206,12 +190,17 @@ impl<'a> LocalExecutor<'a> {
206
190
// the `Executor` is drained of all of its runnables. This ensures that
207
191
// runnables are dropped and this precondition is satisfied.
208
192
//
209
- // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
210
- // Therefore we do not need to worry about what is done with the
211
- // `Waker`.
212
- let ( runnable, task) = Builder :: new ( )
193
+ // `self.schedule()` is not `Send` nor `Sync`. As LocalExecutor is not
194
+ // `Send`, the `Waker` is guaranteed// to only be used on the same thread
195
+ // it was spawned on.
196
+ //
197
+ // `self.schedule()` is `'static`, and thus will outlive all borrowed
198
+ // variables in the future.
199
+ let ( runnable, task) = unsafe {
200
+ Builder :: new ( )
213
201
. propagate_panic ( true )
214
- . spawn_unchecked ( |( ) | future, self . schedule ( ) ) ;
202
+ . spawn_unchecked ( |( ) | future, self . schedule ( ) )
203
+ } ;
215
204
entry. insert ( runnable. waker ( ) ) ;
216
205
217
206
runnable. schedule ( ) ;
@@ -314,19 +303,19 @@ impl<'a> LocalExecutor<'a> {
314
303
/// Returns a reference to the inner state.
315
304
#[ inline]
316
305
fn state ( & self ) -> & State {
317
- // SAFETY: So long as an Executor lives, it's state pointer will always be valid
306
+ // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
318
307
// when accessed through state_ptr.
319
308
unsafe { & * self . state_ptr ( ) }
320
309
}
321
310
322
311
// Clones the inner state Arc
323
312
#[ inline]
324
313
fn state_as_rc ( & self ) -> Rc < State > {
325
- // SAFETY: So long as an Executor lives, it's state pointer will always be a valid
314
+ // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be a valid
326
315
// Arc when accessed through state_ptr.
327
- let arc = unsafe { Rc :: from_raw ( self . state_ptr ( ) ) } ;
328
- let clone = arc . clone ( ) ;
329
- std:: mem:: forget ( arc ) ;
316
+ let rc = unsafe { Rc :: from_raw ( self . state_ptr ( ) ) } ;
317
+ let clone = rc . clone ( ) ;
318
+ std:: mem:: forget ( rc ) ;
330
319
clone
331
320
}
332
321
}
@@ -367,7 +356,7 @@ pub(crate) struct State {
367
356
sleepers : RefCell < Sleepers > ,
368
357
369
358
/// Currently active tasks.
370
- pub ( crate ) active : RefCell < Slab < Waker > > ,
359
+ active : RefCell < Slab < Waker > > ,
371
360
}
372
361
373
362
impl State {
@@ -385,15 +374,14 @@ impl State {
385
374
}
386
375
387
376
/// Returns a reference to currently active tasks.
388
- fn active ( & self ) -> RefMut < ' _ , Slab < Waker > > {
377
+ pub ( crate ) fn active ( & self ) -> RefMut < ' _ , Slab < Waker > > {
389
378
self . active . borrow_mut ( )
390
379
}
391
380
392
381
/// Notifies a sleeping ticker.
393
382
#[ inline]
394
- fn notify ( & self ) {
395
- let waker = self . sleepers . borrow_mut ( ) . notify ( ) ;
396
- if let Some ( w) = waker {
383
+ pub ( crate ) fn notify ( & self ) {
384
+ if let Some ( w) = self . sleepers . borrow_mut ( ) . notify ( ) {
397
385
w. wake ( ) ;
398
386
}
399
387
}
@@ -402,10 +390,6 @@ impl State {
402
390
match self . queue . borrow_mut ( ) . pop_back ( ) {
403
391
None => false ,
404
392
Some ( runnable) => {
405
- // Notify another ticker now to pick up where this ticker left off, just in case
406
- // running the task takes a long time.
407
- self . notify ( ) ;
408
-
409
393
// Run the task.
410
394
runnable. run ( ) ;
411
395
true
@@ -414,20 +398,16 @@ impl State {
414
398
}
415
399
416
400
pub ( crate ) async fn tick ( & self ) {
417
- let runnable = Ticker :: new ( self ) . runnable ( ) . await ;
418
- runnable. run ( ) ;
401
+ Ticker :: new ( self ) . runnable ( ) . await . run ( ) ;
419
402
}
420
403
421
404
pub async fn run < T > ( & self , future : impl Future < Output = T > ) -> T {
422
- let mut runner = Runner :: new ( self ) ;
405
+ let mut ticker = Ticker :: new ( self ) ;
423
406
424
407
// A future that runs tasks forever.
425
408
let run_forever = async {
426
409
loop {
427
- for _ in 0 ..200 {
428
- let runnable = runner. runnable ( ) . await ;
429
- runnable. run ( ) ;
430
- }
410
+ ticker. runnable ( ) . await . run ( ) ;
431
411
future:: yield_now ( ) . await ;
432
412
}
433
413
} ;
@@ -437,75 +417,6 @@ impl State {
437
417
}
438
418
}
439
419
440
- /// A list of sleeping tickers.
441
- struct Sleepers {
442
- /// Number of sleeping tickers (both notified and unnotified).
443
- count : usize ,
444
-
445
- /// IDs and wakers of sleeping unnotified tickers.
446
- ///
447
- /// A sleeping ticker is notified when its waker is missing from this list.
448
- wakers : Vec < ( usize , Waker ) > ,
449
-
450
- /// Reclaimed IDs.
451
- free_ids : Vec < usize > ,
452
- }
453
-
454
- impl Sleepers {
455
- /// Inserts a new sleeping ticker.
456
- fn insert ( & mut self , waker : & Waker ) -> usize {
457
- let id = match self . free_ids . pop ( ) {
458
- Some ( id) => id,
459
- None => self . count + 1 ,
460
- } ;
461
- self . count += 1 ;
462
- self . wakers . push ( ( id, waker. clone ( ) ) ) ;
463
- id
464
- }
465
-
466
- /// Re-inserts a sleeping ticker's waker if it was notified.
467
- ///
468
- /// Returns `true` if the ticker was notified.
469
- fn update ( & mut self , id : usize , waker : & Waker ) -> bool {
470
- for item in & mut self . wakers {
471
- if item. 0 == id {
472
- item. 1 . clone_from ( waker) ;
473
- return false ;
474
- }
475
- }
476
-
477
- self . wakers . push ( ( id, waker. clone ( ) ) ) ;
478
- true
479
- }
480
-
481
- /// Removes a previously inserted sleeping ticker.
482
- ///
483
- /// Returns `true` if the ticker was notified.
484
- fn remove ( & mut self , id : usize ) -> bool {
485
- self . count -= 1 ;
486
- self . free_ids . push ( id) ;
487
-
488
- for i in ( 0 ..self . wakers . len ( ) ) . rev ( ) {
489
- if self . wakers [ i] . 0 == id {
490
- self . wakers . remove ( i) ;
491
- return false ;
492
- }
493
- }
494
- true
495
- }
496
-
497
- /// Returns notification waker for a sleeping ticker.
498
- ///
499
- /// If a ticker was notified already or there are no tickers, `None` will be returned.
500
- fn notify ( & mut self ) -> Option < Waker > {
501
- if self . wakers . len ( ) == self . count {
502
- self . wakers . pop ( ) . map ( |item| item. 1 )
503
- } else {
504
- None
505
- }
506
- }
507
- }
508
-
509
420
/// Runs task one by one.
510
421
struct Ticker < ' a > {
511
422
/// The executor state.
@@ -552,23 +463,16 @@ impl Ticker<'_> {
552
463
/// Moves the ticker into woken state.
553
464
fn wake ( & mut self ) {
554
465
if self . sleeping != 0 {
555
- let mut sleepers = self . state . sleepers . borrow_mut ( ) ;
556
- sleepers. remove ( self . sleeping ) ;
466
+ self . state . sleepers . borrow_mut ( ) . remove ( self . sleeping ) ;
557
467
}
558
468
self . sleeping = 0 ;
559
469
}
560
470
561
- /// Waits for the next runnable task to run.
562
- async fn runnable ( & mut self ) -> Runnable {
563
- self . runnable_with ( || self . state . queue . borrow_mut ( ) . pop_back ( ) )
564
- . await
565
- }
566
-
567
471
/// Waits for the next runnable task to run, given a function that searches for a task.
568
- async fn runnable_with ( & mut self , mut search : impl FnMut ( ) -> Option < Runnable > ) -> Runnable {
472
+ async fn runnable ( & mut self ) -> Runnable {
569
473
future:: poll_fn ( |cx| {
570
474
loop {
571
- match search ( ) {
475
+ match self . state . queue . borrow_mut ( ) . pop_back ( ) {
572
476
None => {
573
477
// Move to sleeping and unnotified state.
574
478
if !self . sleep ( cx. waker ( ) ) {
@@ -580,10 +484,6 @@ impl Ticker<'_> {
580
484
// Wake up.
581
485
self . wake ( ) ;
582
486
583
- // Notify another ticker now to pick up where this ticker left off, just in
584
- // case running the task takes a long time.
585
- self . state . notify ( ) ;
586
-
587
487
return Poll :: Ready ( r) ;
588
488
}
589
489
}
@@ -609,48 +509,6 @@ impl Drop for Ticker<'_> {
609
509
}
610
510
}
611
511
612
- /// A worker in a work-stealing executor.
613
- ///
614
- /// This is just a ticker that also has an associated local queue for improved cache locality.
615
- struct Runner < ' a > {
616
- /// The executor state.
617
- state : & ' a State ,
618
-
619
- /// Inner ticker.
620
- ticker : Ticker < ' a > ,
621
-
622
- /// Bumped every time a runnable task is found.
623
- ticks : usize ,
624
- }
625
-
626
- impl Runner < ' _ > {
627
- /// Creates a runner and registers it in the executor state.
628
- fn new ( state : & State ) -> Runner < ' _ > {
629
- let runner = Runner {
630
- state,
631
- ticker : Ticker :: new ( state) ,
632
- ticks : 0 ,
633
- } ;
634
- runner
635
- }
636
-
637
- /// Waits for the next runnable task to run.
638
- async fn runnable ( & mut self ) -> Runnable {
639
- let runnable = self
640
- . ticker
641
- . runnable_with ( || {
642
- // Try popping from the queue.
643
- self . state . queue . borrow_mut ( ) . pop_back ( )
644
- } )
645
- . await ;
646
-
647
- // Bump the tick counter.
648
- self . ticks = self . ticks . wrapping_add ( 1 ) ;
649
-
650
- runnable
651
- }
652
- }
653
-
654
512
/// Debug implementation for `Executor` and `LocalExecutor`.
655
513
fn debug_executor (
656
514
executor : & LocalExecutor < ' _ > ,
0 commit comments