11//! An executor where you can only push a limited number of tasks.
22
33use async_executor:: { Executor , Task } ;
4- use event_listener:: { Event , EventListener } ;
5- use futures_lite:: pin;
6- use std:: {
7- future:: Future ,
8- sync:: {
9- atomic:: { AtomicUsize , Ordering } ,
10- Arc ,
11- } ,
12- time:: Duration ,
13- } ;
4+ use async_lock:: Semaphore ;
5+ use std:: { future:: Future , sync:: Arc , time:: Duration } ;
146
157/// An executor where you can only push a limited number of tasks.
168struct LimitedExecutor {
179 /// Inner running executor.
1810 executor : Executor < ' static > ,
1911
20- /// Shared state.
21- shared : Arc < SharedState > ,
22- }
23-
24- struct SharedState {
25- /// The maximum number of tasks that can be pushed.
26- max : usize ,
27-
28- /// The current number of active tasks.
29- active : AtomicUsize ,
30-
31- /// Event listeners for when a new task is available.
32- slot_available : Event ,
12+ /// Semaphore limiting the number of tasks.
13+ semaphore : Arc < Semaphore > ,
3314}
3415
3516impl LimitedExecutor {
3617 fn new ( max : usize ) -> Self {
3718 Self {
3819 executor : Executor :: new ( ) ,
39- shared : Arc :: new ( SharedState {
40- max,
41- active : AtomicUsize :: new ( 0 ) ,
42- slot_available : Event :: new ( ) ,
43- } ) ,
20+ semaphore : Semaphore :: new ( max) . into ( ) ,
4421 }
4522 }
4623
@@ -49,67 +26,18 @@ impl LimitedExecutor {
4926 where
5027 F :: Output : Send + ' static ,
5128 {
52- let listener = EventListener :: new ( & self . shared . slot_available ) ;
53- pin ! ( listener) ;
54-
55- // Load the current number of active tasks.
56- let mut active = self . shared . active . load ( Ordering :: Acquire ) ;
57-
58- loop {
59- // Check if there is a slot available.
60- if active < self . shared . max {
61- // Try to set the slot to what would be the new number of tasks.
62- let new_active = active + 1 ;
63- match self . shared . active . compare_exchange (
64- active,
65- new_active,
66- Ordering :: SeqCst ,
67- Ordering :: SeqCst ,
68- ) {
69- Ok ( _) => {
70- // Wrap the future in another future that decrements the active count
71- // when it's done.
72- let future = {
73- let shared = self . shared . clone ( ) ;
74- async move {
75- struct DecOnDrop ( Arc < SharedState > ) ;
76-
77- impl Drop for DecOnDrop {
78- fn drop ( & mut self ) {
79- // Decrement the count and notify someone.
80- self . 0 . active . fetch_sub ( 1 , Ordering :: SeqCst ) ;
81- self . 0 . slot_available . notify ( usize:: MAX ) ;
82- }
83- }
84-
85- let _dec = DecOnDrop ( shared) ;
86- future. await
87- }
88- } ;
89-
90- // Wake up another waiter, in case there is one.
91- self . shared . slot_available . notify ( 1 ) ;
92-
93- // Spawn the task.
94- return self . executor . spawn ( future) ;
95- }
96-
97- Err ( actual) => {
98- // Try again.
99- active = actual;
100- }
101- }
102- } else {
103- // Start waiting for a slot to become available.
104- if listener. as_ref ( ) . is_listening ( ) {
105- listener. as_mut ( ) . await ;
106- } else {
107- listener. as_mut ( ) . listen ( ) ;
108- }
109-
110- active = self . shared . active . load ( Ordering :: Acquire ) ;
111- }
112- }
29+ // Wait for a semaphore permit.
30+ let permit = self . semaphore . acquire_arc ( ) . await ;
31+
32+ // Wrap it into a new future.
33+ let future = async move {
34+ let result = future. await ;
35+ drop ( permit) ;
36+ result
37+ } ;
38+
39+ // Spawn the task.
40+ self . executor . spawn ( future)
11341 }
11442
11543 /// Run a future to completion.
0 commit comments