28
28
29
29
import java .util .Iterator ;
30
30
import java .util .concurrent .TimeUnit ;
31
+ import java .util .concurrent .atomic .AtomicInteger ;
31
32
import java .util .concurrent .locks .Condition ;
32
- import java .util .concurrent .locks .ReadWriteLock ;
33
+ import java .util .concurrent .locks .Lock ;
34
+ import java .util .concurrent .locks .ReentrantLock ;
33
35
import java .util .concurrent .locks .ReentrantReadWriteLock ;
34
36
import java .util .function .Consumer ;
35
37
import java .util .function .Supplier ;
@@ -162,7 +164,7 @@ public T get() {
162
164
public T get (final long timeout , final TimeUnit timeUnit ) {
163
165
stateAndPermits .throwIfClosedOrPaused ();
164
166
165
- if (!stateAndPermits .acquirePermitFair (timeout , timeUnit )) {
167
+ if (!stateAndPermits .acquirePermit (timeout , timeUnit )) {
166
168
throw new MongoTimeoutException (String .format ("Timeout waiting for a pooled item after %d %s" , timeout , timeUnit ));
167
169
}
168
170
@@ -221,7 +223,7 @@ public void prune() {
221
223
public void ensureMinSize (final int minSize , final Consumer <T > initAndRelease ) {
222
224
stateAndPermits .throwIfClosedOrPaused ();
223
225
while (getCount () < minSize ) {
224
- if (!stateAndPermits .acquirePermitFair (0 , TimeUnit .MILLISECONDS )) {
226
+ if (!stateAndPermits .acquirePermit (0 , TimeUnit .MILLISECONDS )) {
225
227
break ;
226
228
}
227
229
initAndRelease .accept (createNewAndReleasePermitIfFailure ());
@@ -246,7 +248,7 @@ private T createNewAndReleasePermitIfFailure() {
246
248
*/
247
249
@ VisibleForTesting (otherwise = PRIVATE )
248
250
boolean acquirePermit (final long timeout , final TimeUnit timeUnit ) {
249
- return stateAndPermits .acquirePermitFair (timeout , timeUnit );
251
+ return stateAndPermits .acquirePermit (timeout , timeUnit );
250
252
}
251
253
252
254
/**
@@ -330,12 +332,42 @@ static boolean isPoolClosedException(final Throwable e) {
330
332
@ ThreadSafe
331
333
private static final class StateAndPermits {
332
334
private final Supplier <MongoServerUnavailableException > poolClosedExceptionSupplier ;
333
- private final ReadWriteLock lock ;
335
+ private final ReentrantReadWriteLock lock ;
334
336
private final Condition permitAvailableOrClosedOrPausedCondition ;
335
337
private volatile boolean paused ;
336
338
private volatile boolean closed ;
337
339
private final int maxPermits ;
338
340
private volatile int permits ;
341
+ /** When there are not enough available permits to serve all threads requesting a permit, threads are queued and wait on
342
+ * {@link #permitAvailableOrClosedOrPausedCondition}. Because of this waiting, we want threads to acquire the lock fairly,
343
+ * to avoid a situation when some threads are sitting in the queue for a long time while others barge in and acquire
344
+ * the lock without waiting in the queue. Fair locking reduces high percentiles of {@link #acquirePermit(long, TimeUnit)} latencies
345
+ * but reduces its throughput: it makes latencies roughly equally high for everyone, while keeping them lower than the highest
346
+ * latencies with unfair locking. The fair approach is in accordance with the
347
+ * <a href="https://github.com/mongodb/specifications/blob/568093ce7f0e1394cf4952c417e1e7dacc5fef53/source/connection-monitoring-and-pooling/connection-monitoring-and-pooling.rst#waitqueue">
348
+ * connection pool specification</a>.
349
+ * <p>
350
+ * When there are enough available permits to serve all threads requesting a permit, threads still have to acquire the lock,
351
+ * and still are queued, but since they are not waiting on {@link #permitAvailableOrClosedOrPausedCondition},
352
+ * threads spend less time in the queue. This results in having smaller high percentiles
353
+ * of {@link #acquirePermit(long, TimeUnit)} latencies, and we do not want to sacrifice the throughput
354
+ * to further reduce the high percentiles by acquiring the lock fairly.</p>
355
+ * <p>
356
+ * While there is a chance that the expressed reasoning is flawed, it is supported by the results of experiments reported in
357
+ * comments in <a href="https://jira.mongodb.org/browse/JAVA-4452">JAVA-4452</a>.</p>
358
+ * <p>
359
+ * {@link ReentrantReadWriteLock#hasWaiters(Condition)} requires holding the lock to be called, therefore we cannot use it
360
+ * to discriminate between the two cases described above, and we use {@link #waitersEstimate} instead.
361
+ * This approach results in sometimes acquiring a lock unfairly when it should have been acquired fairly, and vice versa.
362
+ * But it appears to be a good enough compromise, that results in having enough throughput when there are enough
363
+ * available permits and tolerable high percentiles of latencies when there are not enough available permits.</p>
364
+ * <p>
365
+ * It may seem viable to use {@link #permits} > 0 as a way to decide that there are likely no waiters,
366
+ * but benchmarking shows that with this approach high percentiles of contended {@link #acquirePermit(long, TimeUnit)} latencies
367
+ * (when the number of threads that use the pool is higher than the maximum pool size) become similar to a situation when no
368
+ * fair locking is used. That is, this approach does not result in the behavior we want.</p>
369
+ */
370
+ private final AtomicInteger waitersEstimate ;
339
371
@ Nullable
340
372
private Supplier <MongoException > causeSupplier ;
341
373
@@ -347,6 +379,7 @@ private static final class StateAndPermits {
347
379
closed = false ;
348
380
this .maxPermits = maxPermits ;
349
381
permits = maxPermits ;
382
+ waitersEstimate = new AtomicInteger ();
350
383
causeSupplier = null ;
351
384
}
352
385
@@ -355,9 +388,7 @@ int permits() {
355
388
}
356
389
357
390
boolean acquirePermitImmediateUnfair () {
358
- if (!lock .writeLock ().tryLock ()) { // unfair
359
- lock .writeLock ().lock ();
360
- }
391
+ lockUnfair (lock .writeLock ());
361
392
try {
362
393
throwIfClosedOrPaused ();
363
394
if (permits > 0 ) {
@@ -373,21 +404,24 @@ boolean acquirePermitImmediateUnfair() {
373
404
}
374
405
375
406
/**
407
+ * This method also emulates the eager {@link InterruptedException} behavior of
408
+ * {@link java.util.concurrent.Semaphore#tryAcquire(long, TimeUnit)}.
409
+ *
376
410
* @param timeout See {@link com.mongodb.internal.Timeout#startNow(long, TimeUnit)}.
377
411
*/
378
- boolean acquirePermitFair (final long timeout , final TimeUnit unit ) throws MongoInterruptedException {
412
+ boolean acquirePermit (final long timeout , final TimeUnit unit ) throws MongoInterruptedException {
379
413
long remainingNanos = unit .toNanos (timeout );
380
- try {
381
- // preserve the eager InterruptedException behavior of `Semaphore.tryAcquire(long, TimeUnit)`
382
- lock .writeLock ().lockInterruptibly ();
383
- } catch (InterruptedException e ) {
384
- throw new MongoInterruptedException (null , e );
414
+ if (waitersEstimate .get () == 0 ) {
415
+ lockInterruptiblyUnfair (lock .writeLock ());
416
+ } else {
417
+ lockInterruptibly (lock .writeLock ());
385
418
}
386
419
try {
387
420
while (permits == 0
388
421
// the absence of short-circuiting is of importance
389
422
& !throwIfClosedOrPaused ()) {
390
423
try {
424
+ waitersEstimate .incrementAndGet ();
391
425
if (timeout < 0 || remainingNanos == Long .MAX_VALUE ) {
392
426
permitAvailableOrClosedOrPausedCondition .await ();
393
427
} else if (remainingNanos >= 0 ) {
@@ -397,6 +431,8 @@ boolean acquirePermitFair(final long timeout, final TimeUnit unit) throws MongoI
397
431
}
398
432
} catch (InterruptedException e ) {
399
433
throw new MongoInterruptedException (null , e );
434
+ } finally {
435
+ waitersEstimate .decrementAndGet ();
400
436
}
401
437
}
402
438
assertTrue (permits > 0 );
@@ -409,7 +445,7 @@ boolean acquirePermitFair(final long timeout, final TimeUnit unit) throws MongoI
409
445
}
410
446
411
447
void releasePermit () {
412
- lock .writeLock (). lock ( );
448
+ lockUnfair ( lock .writeLock ());
413
449
try {
414
450
assertTrue (permits < maxPermits );
415
451
//noinspection NonAtomicOperationOnVolatileField
@@ -421,7 +457,7 @@ void releasePermit() {
421
457
}
422
458
423
459
void pause (final Supplier <MongoException > causeSupplier ) {
424
- lock .writeLock (). lock ( );
460
+ lockUnfair ( lock .writeLock ());
425
461
try {
426
462
if (!paused ) {
427
463
this .paused = true ;
@@ -435,7 +471,7 @@ void pause(final Supplier<MongoException> causeSupplier) {
435
471
436
472
void ready () {
437
473
if (paused ) {
438
- lock .writeLock (). lock ( );
474
+ lockUnfair ( lock .writeLock ());
439
475
try {
440
476
this .paused = false ;
441
477
this .causeSupplier = null ;
@@ -450,7 +486,7 @@ void ready() {
450
486
*/
451
487
boolean close () {
452
488
if (!closed ) {
453
- lock .writeLock (). lock ( );
489
+ lockUnfair ( lock .writeLock ());
454
490
try {
455
491
if (!closed ) {
456
492
closed = true ;
@@ -501,4 +537,45 @@ boolean closed() {
501
537
static String sizeToString (final int size ) {
502
538
return size == INFINITE_SIZE ? "infinite" : Integer .toString (size );
503
539
}
540
+
541
+ static void lockInterruptibly (final Lock lock ) throws MongoInterruptedException {
542
+ try {
543
+ lock .lockInterruptibly ();
544
+ } catch (InterruptedException e ) {
545
+ throw new MongoInterruptedException (null , e );
546
+ }
547
+ }
548
+
549
+ private static void lockInterruptiblyUnfair (final ReentrantReadWriteLock .WriteLock lock ) throws MongoInterruptedException {
550
+ throwIfInterrupted ();
551
+ // `WriteLock.tryLock` is unfair
552
+ if (!lock .tryLock ()) {
553
+ try {
554
+ lock .lockInterruptibly ();
555
+ } catch (InterruptedException e ) {
556
+ Thread .currentThread ().interrupt ();
557
+ throw new MongoInterruptedException (null , new InterruptedException ());
558
+ }
559
+ }
560
+ }
561
+
562
+ static void lockUnfair (final ReentrantLock lock ) {
563
+ // `ReentrantLock.tryLock` is unfair
564
+ if (!lock .tryLock ()) {
565
+ lock .lock ();
566
+ }
567
+ }
568
+
569
+ private static void lockUnfair (final ReentrantReadWriteLock .WriteLock lock ) {
570
+ // `WriteLock.tryLock` is unfair
571
+ if (!lock .tryLock ()) {
572
+ lock .lock ();
573
+ }
574
+ }
575
+
576
+ private static void throwIfInterrupted () throws MongoInterruptedException {
577
+ if (Thread .currentThread ().isInterrupted ()) {
578
+ throw new MongoInterruptedException (null , new InterruptedException ());
579
+ }
580
+ }
504
581
}
0 commit comments