@@ -3,7 +3,7 @@ use std::{
3
3
mem,
4
4
pin:: Pin ,
5
5
sync:: {
6
- atomic:: { AtomicBool , AtomicUsize , Ordering } ,
6
+ atomic:: { AtomicBool , Ordering } ,
7
7
Arc ,
8
8
} ,
9
9
task:: { Context , Poll } ,
@@ -43,27 +43,6 @@ pub(crate) struct Conn {
43
43
pub token : Token ,
44
44
}
45
45
46
- static MAX_CONNS : AtomicUsize = AtomicUsize :: new ( 25600 ) ;
47
-
48
- /// Sets the maximum per-worker number of concurrent connections.
49
- ///
50
- /// All socket listeners will stop accepting connections when this limit is
51
- /// reached for each worker.
52
- ///
53
- /// By default max connections is set to a 25k per worker.
54
- pub fn max_concurrent_connections ( num : usize ) {
55
- MAX_CONNS . store ( num, Ordering :: Relaxed ) ;
56
- }
57
-
58
- thread_local ! {
59
- static MAX_CONNS_COUNTER : Counter =
60
- Counter :: new( MAX_CONNS . load( Ordering :: Relaxed ) ) ;
61
- }
62
-
63
- pub ( crate ) fn num_connections ( ) -> usize {
64
- MAX_CONNS_COUNTER . with ( |conns| conns. total ( ) )
65
- }
66
-
67
46
// a handle to worker that can send message to worker and share the availability of worker to other
68
47
// thread.
69
48
#[ derive( Clone ) ]
@@ -173,6 +152,7 @@ enum WorkerServiceStatus {
173
152
pub ( crate ) struct ServerWorkerConfig {
174
153
shutdown_timeout : Duration ,
175
154
max_blocking_threads : usize ,
155
+ max_concurrent_connections : usize ,
176
156
}
177
157
178
158
impl Default for ServerWorkerConfig {
@@ -182,6 +162,7 @@ impl Default for ServerWorkerConfig {
182
162
Self {
183
163
shutdown_timeout : Duration :: from_secs ( 30 ) ,
184
164
max_blocking_threads,
165
+ max_concurrent_connections : 25600 ,
185
166
}
186
167
}
187
168
}
@@ -191,6 +172,10 @@ impl ServerWorkerConfig {
191
172
self . max_blocking_threads = num;
192
173
}
193
174
175
+ pub ( crate ) fn max_concurrent_connections ( & mut self , num : usize ) {
176
+ self . max_concurrent_connections = num;
177
+ }
178
+
194
179
pub ( crate ) fn shutdown_timeout ( & mut self , dur : Duration ) {
195
180
self . shutdown_timeout = dur;
196
181
}
@@ -218,16 +203,16 @@ impl ServerWorker {
218
203
} )
219
204
. spawn ( async move {
220
205
availability. set ( false ) ;
221
- let mut wrk = MAX_CONNS_COUNTER . with ( move |conns| ServerWorker {
206
+ let mut wrk = ServerWorker {
222
207
rx,
223
208
rx2,
224
209
services : Vec :: new ( ) ,
225
210
availability,
211
+ conns : Counter :: new ( config. max_concurrent_connections ) ,
226
212
factories,
227
213
state : Default :: default ( ) ,
228
214
shutdown_timeout : config. shutdown_timeout ,
229
- conns : conns. clone ( ) ,
230
- } ) ;
215
+ } ;
231
216
232
217
let fut = wrk
233
218
. factories
@@ -383,7 +368,7 @@ impl Future for ServerWorker {
383
368
Pin :: new ( & mut this. rx2 ) . poll_recv ( cx)
384
369
{
385
370
this. availability . set ( false ) ;
386
- let num = num_connections ( ) ;
371
+ let num = this . conns . total ( ) ;
387
372
if num == 0 {
388
373
info ! ( "Shutting down worker, 0 connections" ) ;
389
374
let _ = tx. send ( true ) ;
@@ -450,7 +435,7 @@ impl Future for ServerWorker {
450
435
// Wait for 1 second.
451
436
ready ! ( shutdown. timer. as_mut( ) . poll( cx) ) ;
452
437
453
- if num_connections ( ) == 0 {
438
+ if this . conns . total ( ) == 0 {
454
439
// Graceful shutdown.
455
440
if let WorkerState :: Shutdown ( shutdown) = mem:: take ( & mut this. state ) {
456
441
let _ = shutdown. tx . send ( true ) ;
0 commit comments