22// SPDX-License-Identifier: Apache-2.0
33
44use crate :: metrics:: TIMER ;
5- use aptos_experimental_runtimes:: thread_manager:: THREAD_MANAGER ;
65use aptos_infallible:: Mutex ;
76use aptos_metrics_core:: TimerHelper ;
87use std:: sync:: mpsc:: { channel, Receiver , Sender } ;
8+ use threadpool:: ThreadPool ;
99
1010/// A helper to send things to a thread pool for asynchronous dropping.
1111///
@@ -17,20 +17,24 @@ pub struct AsyncConcurrentDropper {
1717 name : & ' static str ,
1818 token_tx : Sender < ( ) > ,
1919 token_rx : Mutex < Receiver < ( ) > > ,
20+ /// use dedicated threadpool to minimize the possibility of dead lock
21+ thread_pool : ThreadPool ,
2022}
2123
2224impl AsyncConcurrentDropper {
23- pub fn new ( name : & ' static str , max_concurrent_drops : usize ) -> Self {
25+ pub fn new ( name : & ' static str , max_async_drops : usize , num_threads : usize ) -> Self {
2426 let ( token_tx, token_rx) = channel ( ) ;
25- for _ in 0 ..max_concurrent_drops {
27+ for _ in 0 ..max_async_drops {
2628 token_tx
2729 . send ( ( ) )
2830 . expect ( "DropHelper: Failed to buffer initial tokens." ) ;
2931 }
32+ let thread_pool = ThreadPool :: new ( num_threads) ;
3033 Self {
3134 name,
3235 token_tx,
3336 token_rx : Mutex :: new ( token_rx) ,
37+ thread_pool,
3438 }
3539 }
3640
@@ -51,7 +55,7 @@ impl AsyncConcurrentDropper {
5155
5256 let token_tx = self . token_tx . clone ( ) ;
5357 let name = self . name ;
54- THREAD_MANAGER . get_non_exe_cpu_pool ( ) . spawn ( move || {
58+ self . thread_pool . execute ( move || {
5559 let _timer = TIMER . timer_with ( & [ name, "real_drop" ] ) ;
5660
5761 drop ( v) ;
@@ -74,32 +78,43 @@ mod tests {
7478
7579 impl Drop for SlowDropper {
7680 fn drop ( & mut self ) {
77- sleep ( Duration :: from_secs ( 1 ) ) ;
81+ sleep ( Duration :: from_millis ( 200 ) ) ;
7882 }
7983 }
8084
8185 #[ test]
82- fn test_concurrency_limit_hit ( ) {
83- let s = AsyncConcurrentDropper :: new ( "test" , 8 ) ;
86+ fn test_within_concurrency_limit ( ) {
87+ let s = AsyncConcurrentDropper :: new ( "test" , 8 , 4 ) ;
8488 let now = std:: time:: Instant :: now ( ) ;
85- let rx = s. schedule_drop_with_waiter ( SlowDropper ) ;
86- for _ in 1 ..8 {
89+ let rx1 = s. schedule_drop_with_waiter ( SlowDropper ) ; // first round
90+ for _ in 0 ..3 {
91+ s. schedule_drop ( SlowDropper ) ;
92+ }
93+ let rx2 = s. schedule_drop_with_waiter ( SlowDropper ) ; // second round
94+ for _ in 0 ..3 {
8795 s. schedule_drop ( SlowDropper ) ;
8896 }
89- assert ! ( now. elapsed( ) < Duration :: from_millis( 500 ) ) ;
90- rx. recv ( ) . unwrap ( ) ;
91- assert ! ( now. elapsed( ) > Duration :: from_secs( 1 ) ) ;
97+ assert ! ( now. elapsed( ) < Duration :: from_millis( 200 ) ) ;
98+ rx1. recv ( ) . unwrap ( ) ;
99+ assert ! ( now. elapsed( ) > Duration :: from_millis( 200 ) ) ;
100+ assert ! ( now. elapsed( ) < Duration :: from_millis( 400 ) ) ;
101+ rx2. recv ( ) . unwrap ( ) ;
102+ assert ! ( now. elapsed( ) > Duration :: from_millis( 400 ) ) ;
103+ assert ! ( now. elapsed( ) < Duration :: from_millis( 600 ) ) ;
92104 }
93105
94106 #[ test]
95- fn test_within_concurrency_limit ( ) {
96- let s = AsyncConcurrentDropper :: new ( "test" , 8 ) ;
107+ fn test_concurrency_limit_hit ( ) {
108+ let s = AsyncConcurrentDropper :: new ( "test" , 8 , 4 ) ;
97109 let now = std:: time:: Instant :: now ( ) ;
98110 for _ in 0 ..8 {
99111 s. schedule_drop ( SlowDropper ) ;
100112 }
101- assert ! ( now. elapsed( ) < Duration :: from_millis( 500 ) ) ;
113+ assert ! ( now. elapsed( ) < Duration :: from_millis( 200 ) ) ;
114+ s. schedule_drop ( SlowDropper ) ;
115+ assert ! ( now. elapsed( ) > Duration :: from_millis( 200 ) ) ;
116+ assert ! ( now. elapsed( ) < Duration :: from_millis( 400 ) ) ;
102117 s. schedule_drop ( SlowDropper ) ;
103- assert ! ( now. elapsed( ) > Duration :: from_secs ( 1 ) ) ;
118+ assert ! ( now. elapsed( ) < Duration :: from_millis ( 400 ) ) ;
104119 }
105120}
0 commit comments