1
- use std:: sync:: atomic:: { AtomicUsize , Ordering :: Relaxed } ;
1
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2
2
use std:: sync:: { mpsc, Arc } ;
3
3
use std:: { net, thread, time} ;
4
4
@@ -169,7 +169,7 @@ fn test_configure() {
169
169
rt. service ( "addr1" , fn_service ( |_| ok :: < _ , ( ) > ( ( ) ) ) ) ;
170
170
rt. service ( "addr3" , fn_service ( |_| ok :: < _ , ( ) > ( ( ) ) ) ) ;
171
171
rt. on_start ( lazy ( move |_| {
172
- let _ = num. fetch_add ( 1 , Relaxed ) ;
172
+ let _ = num. fetch_add ( 1 , Ordering :: Relaxed ) ;
173
173
} ) )
174
174
} )
175
175
} )
@@ -187,7 +187,80 @@ fn test_configure() {
187
187
assert ! ( net:: TcpStream :: connect( addr1) . is_ok( ) ) ;
188
188
assert ! ( net:: TcpStream :: connect( addr2) . is_ok( ) ) ;
189
189
assert ! ( net:: TcpStream :: connect( addr3) . is_ok( ) ) ;
190
- assert_eq ! ( num. load( Relaxed ) , 1 ) ;
190
+ assert_eq ! ( num. load( Ordering :: Relaxed ) , 1 ) ;
191
191
sys. stop ( ) ;
192
192
let _ = h. join ( ) ;
193
193
}
194
+
195
+ #[ actix_rt:: test]
196
+ async fn test_max_concurrent_connections ( ) {
197
+ // Note:
198
+ // A tcp listener would accept connects based on it's backlog setting.
199
+ //
200
+ // The limit test on the other hand is only for concurrent tcp stream limiting a work
201
+ // thread accept.
202
+
203
+ use actix_rt:: net:: TcpStream ;
204
+ use tokio:: io:: AsyncWriteExt ;
205
+
206
+ let addr = unused_addr ( ) ;
207
+ let ( tx, rx) = mpsc:: channel ( ) ;
208
+
209
+ let counter = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
210
+ let counter_clone = counter. clone ( ) ;
211
+
212
+ let max_conn = 3 ;
213
+
214
+ let h = thread:: spawn ( move || {
215
+ actix_rt:: System :: new ( ) . block_on ( async {
216
+ let server = Server :: build ( )
217
+ // Set a relative higher backlog.
218
+ . backlog ( 12 )
219
+ // max connection for a worker is 3.
220
+ . maxconn ( max_conn)
221
+ . workers ( 1 )
222
+ . disable_signals ( )
223
+ . bind ( "test" , addr, move || {
224
+ let counter = counter. clone ( ) ;
225
+ fn_service ( move |_io : TcpStream | {
226
+ let counter = counter. clone ( ) ;
227
+ async move {
228
+ counter. fetch_add ( 1 , Ordering :: SeqCst ) ;
229
+ actix_rt:: time:: sleep ( time:: Duration :: from_secs ( 20 ) ) . await ;
230
+ counter. fetch_sub ( 1 , Ordering :: SeqCst ) ;
231
+ Ok :: < ( ) , ( ) > ( ( ) )
232
+ }
233
+ } )
234
+ } ) ?
235
+ . run ( ) ;
236
+
237
+ let _ = tx. send ( ( server. clone ( ) , actix_rt:: System :: current ( ) ) ) ;
238
+
239
+ server. await
240
+ } )
241
+ } ) ;
242
+
243
+ let ( srv, sys) = rx. recv ( ) . unwrap ( ) ;
244
+
245
+ let mut conns = vec ! [ ] ;
246
+
247
+ for _ in 0 ..12 {
248
+ let conn = tokio:: net:: TcpStream :: connect ( addr) . await . unwrap ( ) ;
249
+ conns. push ( conn) ;
250
+ }
251
+
252
+ actix_rt:: time:: sleep ( time:: Duration :: from_secs ( 5 ) ) . await ;
253
+
254
+ // counter would remain at 3 even with 12 successful connection.
255
+ // and 9 of them remain in backlog.
256
+ assert_eq ! ( max_conn, counter_clone. load( Ordering :: SeqCst ) ) ;
257
+
258
+ for mut conn in conns {
259
+ conn. shutdown ( ) . await . unwrap ( ) ;
260
+ }
261
+
262
+ srv. stop ( false ) . await ;
263
+
264
+ sys. stop ( ) ;
265
+ let _ = h. join ( ) . unwrap ( ) ;
266
+ }
0 commit comments