@@ -84,13 +84,6 @@ struct Handler {
84
84
/// the byte level protocol parsing details encapsulated in `Connection`.
85
85
connection : Connection ,
86
86
87
- /// Max connection semaphore.
88
- ///
89
- /// When the handler is dropped, a permit is returned to this semaphore. If
90
- /// the listener is waiting for connections to close, it will be notified of
91
- /// the newly available permit and resume accepting connections.
92
- limit_connections : Arc < Semaphore > ,
93
-
94
87
/// Listen for shutdown notifications.
95
88
///
96
89
/// A wrapper around the `broadcast::Receiver` paired with the sender in
@@ -229,18 +222,18 @@ impl Listener {
229
222
loop {
230
223
// Wait for a permit to become available
231
224
//
232
- // `acquire` returns a permit that is bound via a lifetime to the
233
- // semaphore. When the permit value is dropped, it is automatically
234
- // returned to the semaphore. This is convenient in many cases.
235
- // However, in this case, the permit must be returned in a different
236
- // task than it is acquired in (the handler task). To do this, we
237
- // "forget" the permit, which drops the permit value **without**
238
- // incrementing the semaphore's permits. Then, in the handler task
239
- // we manually add a new permit when processing completes.
225
+ // `acquire_owned` returns a permit that is bound to the semaphore.
226
+ // When the permit value is dropped, it is automatically returned
227
+ // to the semaphore.
240
228
//
241
- // `acquire()` returns `Err` when the semaphore has been closed. We
242
- // don't ever close the sempahore, so `unwrap()` is safe.
243
- self . limit_connections . acquire ( ) . await . unwrap ( ) . forget ( ) ;
229
+ // `acquire_owned()` returns `Err` when the semaphore has been
230
+ // closed. We don't ever close the sempahore, so `unwrap()` is safe.
231
+ let permit = self
232
+ . limit_connections
233
+ . clone ( )
234
+ . acquire_owned ( )
235
+ . await
236
+ . unwrap ( ) ;
244
237
245
238
// Accept a new socket. This will attempt to perform error handling.
246
239
// The `accept` method internally attempts to recover errors, so an
@@ -256,11 +249,6 @@ impl Listener {
256
249
// buffers to perform redis protocol frame parsing.
257
250
connection : Connection :: new ( socket) ,
258
251
259
- // The connection state needs a handle to the max connections
260
- // semaphore. When the handler is done processing the
261
- // connection, a permit is added back to the semaphore.
262
- limit_connections : self . limit_connections . clone ( ) ,
263
-
264
252
// Receive shutdown notifications.
265
253
shutdown : Shutdown :: new ( self . notify_shutdown . subscribe ( ) ) ,
266
254
@@ -276,6 +264,9 @@ impl Listener {
276
264
if let Err ( err) = handler. run ( ) . await {
277
265
error ! ( cause = ?err, "connection error" ) ;
278
266
}
267
+ // Move the permit into the task and drop it after completion.
268
+ // This returns the permit back to the semaphore.
269
+ drop ( permit) ;
279
270
} ) ;
280
271
}
281
272
}
@@ -380,19 +371,3 @@ impl Handler {
380
371
Ok ( ( ) )
381
372
}
382
373
}
383
-
384
- impl Drop for Handler {
385
- fn drop ( & mut self ) {
386
- // Add a permit back to the semaphore.
387
- //
388
- // Doing so unblocks the listener if the max number of
389
- // connections has been reached.
390
- //
391
- // This is done in a `Drop` implementation in order to guarantee that
392
- // the permit is added even if the task handling the connection panics.
393
- // If `add_permit` was called at the end of the `run` function and some
394
- // bug causes a panic. The permit would never be returned to the
395
- // semaphore.
396
- self . limit_connections . add_permits ( 1 ) ;
397
- }
398
- }
0 commit comments