@@ -33,14 +33,14 @@ pub struct PoolConfig {
3333impl Default for PoolConfig {
3434 fn default ( ) -> Self {
3535 Self {
36- max_size : 50 , // 增加连接池大小以支持更多并发
37- min_idle : 10 , // 保持更多空闲连接
38- max_idle_time_ms : 180_000 , // 3分钟 - 减少空闲时间以释放资源更快
39- connection_timeout_ms : 5_000 , // 减少连接超时到5秒
40- retry_delay_ms : 25 , // 减少重试延迟到25ms
41- max_retries : 3 , // 减少重试次数以避免过长等待
42- max_concurrent_requests : 16 , // 增加并发请求限制
43- max_requests_per_second : Some ( 50 .0) , // 增加速率限制
36+ max_size : 64 , // 增加到2的幂次,更好的内存对齐
37+ min_idle : 8 , // 减少最小空闲连接
38+ max_idle_time_ms : 120_000 , // 2分钟 - 进一步减少空闲时间
39+ connection_timeout_ms : 3_000 , // 减少连接超时到3秒
40+ retry_delay_ms : 10 , // 减少重试延迟到10ms
41+ max_retries : 2 , // 减少重试次数到2次
42+ max_concurrent_requests : 32 , // 增加并发请求限制到2的幂次
43+ max_requests_per_second : Some ( 100 .0) , // 增加速率限制
4444 }
4545 }
4646}
@@ -124,6 +124,8 @@ struct ConnectionPoolInner {
124124 semaphore : Semaphore ,
125125 /// 专用于PUT请求的新连接缓存
126126 fresh_connections : Mutex < VecDeque < LocalSocketStream > > ,
127+ /// 快速路径计数器,用于避免semaphore竞争
128+ active_connections : std:: sync:: atomic:: AtomicUsize ,
127129}
128130
129131impl ConnectionPoolInner {
@@ -133,6 +135,7 @@ impl ConnectionPoolInner {
133135 semaphore : Semaphore :: new ( config. max_size ) ,
134136 connections : Mutex :: new ( VecDeque :: new ( ) ) ,
135137 fresh_connections : Mutex :: new ( VecDeque :: new ( ) ) ,
138+ active_connections : std:: sync:: atomic:: AtomicUsize :: new ( 0 ) ,
136139 config,
137140 }
138141 }
@@ -203,12 +206,13 @@ impl ConnectionPoolInner {
203206 async fn create_connection ( & self ) -> Result < LocalSocketStream > {
204207 let mut last_error = None ;
205208 let mut delay = self . config . retry_delay ( ) ;
209+ let max_delay = Duration :: from_millis ( 200 ) ; // 限制最大延迟为200ms
206210
207211 for attempt in 0 ..self . config . max_retries {
208212 if attempt > 0 {
209- // Exponential backoff retry delay
213+ // 优化的指数退避,避免过长的延迟
210214 tokio:: time:: sleep ( delay) . await ;
211- delay = std:: cmp:: min ( delay * 2 , Duration :: from_millis ( 1000 ) ) ;
215+ delay = std:: cmp:: min ( delay * 2 , max_delay ) ;
212216 }
213217
214218 match LocalSocketStream :: connect ( self . name . clone ( ) ) . await {
@@ -254,6 +258,9 @@ impl ConnectionPoolInner {
254258 fn return_connection ( & self , stream : LocalSocketStream ) {
255259 let mut connections = self . connections . lock ( ) ;
256260
261+ // 减少活跃连接计数
262+ self . active_connections . fetch_sub ( 1 , std:: sync:: atomic:: Ordering :: Relaxed ) ;
263+
257264 // Only keep the connection if we haven't exceeded max_size
258265 if connections. len ( ) < self . config . max_size {
259266 connections. push_back ( ( stream, Instant :: now ( ) ) ) ;
@@ -264,36 +271,61 @@ impl ConnectionPoolInner {
264271 }
265272
266273 async fn get_connection_with_timeout ( & self ) -> Result < LocalSocketStream > {
267- // Try to get a permit within the timeout
268- let permit =
269- tokio:: time:: timeout ( self . config . connection_timeout ( ) , self . semaphore . acquire ( ) )
270- . await
271- . map_err ( |_| {
272- KodeBridgeError :: timeout ( self . config . connection_timeout ( ) . as_millis ( ) as u64 )
273- } ) ?
274- . map_err ( |_| KodeBridgeError :: custom ( "Semaphore closed" ) ) ?;
275-
276- // Try to get an existing connection first
274+ // 优化的获取连接逻辑,减少semaphore竞争
275+
276+ // 首先快速检查是否有可用的池化连接
277+ if let Some ( stream) = self . get_pooled_connection ( ) {
278+ return Ok ( stream) ;
279+ }
280+
281+ // 检查活跃连接数,避免不必要的semaphore等待
282+ let active_count = self . active_connections . load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
283+ if active_count >= self . config . max_size {
284+ // 快速失败路径,避免长时间等待
285+ return Err ( KodeBridgeError :: custom ( "Connection pool exhausted" ) ) ;
286+ }
287+
288+ // 使用更短的超时来获取许可
289+ let timeout = std:: cmp:: min ( self . config . connection_timeout ( ) , Duration :: from_millis ( 500 ) ) ;
290+ let permit = tokio:: time:: timeout ( timeout, self . semaphore . acquire ( ) )
291+ . await
292+ . map_err ( |_| {
293+ KodeBridgeError :: timeout ( timeout. as_millis ( ) as u64 )
294+ } ) ?
295+ . map_err ( |_| KodeBridgeError :: custom ( "Semaphore closed" ) ) ?;
296+
297+ // 再次检查池化连接(避免不必要的连接创建)
277298 if let Some ( stream) = self . get_pooled_connection ( ) {
278299 permit. forget ( ) ; // Release the permit since we're using a pooled connection
279300 return Ok ( stream) ;
280301 }
281302
282- // Create a new connection
283- let stream = self . create_connection ( ) . await ?;
284- permit. forget ( ) ; // Release the permit
285- Ok ( stream)
303+ // 增加活跃连接计数
304+ self . active_connections . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: Relaxed ) ;
305+
306+ // 创建新连接
307+ match self . create_connection ( ) . await {
308+ Ok ( stream) => {
309+ permit. forget ( ) ; // Release the permit
310+ Ok ( stream)
311+ }
312+ Err ( e) => {
313+ // 出错时减少活跃连接计数
314+ self . active_connections . fetch_sub ( 1 , std:: sync:: atomic:: Ordering :: Relaxed ) ;
315+ Err ( e)
316+ }
317+ }
286318 }
287319
288320 /// Get a fresh connection optimized for PUT requests
289321 async fn get_fresh_connection_with_timeout ( & self ) -> Result < LocalSocketStream > {
290- // Try to get a permit within a shorter timeout for PUT requests
291- let permit = tokio:: time:: timeout ( Duration :: from_millis ( 50 ) , self . semaphore . acquire ( ) )
322+ // 对PUT请求使用专门优化的逻辑
323+ let permit = tokio:: time:: timeout ( Duration :: from_millis ( 100 ) , self . semaphore . acquire ( ) )
292324 . await
293- . map_err ( |_| KodeBridgeError :: timeout ( 50 ) ) ?
325+ . map_err ( |_| KodeBridgeError :: timeout ( 100 ) ) ?
294326 . map_err ( |_| KodeBridgeError :: custom ( "Semaphore closed" ) ) ?;
295327
296- // Get fresh connection directly
328+ // Get fresh connection directly with optimized parameters
297329 let stream = self . get_fresh_connection ( ) . await ?;
298330 permit. forget ( ) ; // Release the permit
299331 Ok ( stream)
@@ -362,10 +394,12 @@ impl ConnectionPool {
362394 /// Get pool statistics
363395 pub fn stats ( & self ) -> PoolStats {
364396 let connections = self . inner . connections . lock ( ) ;
397+ let active_count = self . inner . active_connections . load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
365398 PoolStats {
366399 total_connections : connections. len ( ) ,
367400 available_permits : self . inner . semaphore . available_permits ( ) ,
368401 max_size : self . inner . config . max_size ,
402+ active_connections : active_count,
369403 }
370404 }
371405
@@ -383,14 +417,15 @@ pub struct PoolStats {
383417 pub total_connections : usize ,
384418 pub available_permits : usize ,
385419 pub max_size : usize ,
420+ pub active_connections : usize ,
386421}
387422
388423impl std:: fmt:: Display for PoolStats {
389424 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
390425 write ! (
391426 f,
392- "Pool(connections: {}, permits: {}, max: {})" ,
393- self . total_connections, self . available_permits, self . max_size
427+ "Pool(connections: {}, active: {}, permits: {}, max: {})" ,
428+ self . total_connections, self . active_connections , self . available_permits, self . max_size
394429 )
395430 }
396431}
0 commit comments