Skip to content

Commit e8e62d2

Browse files
committed
Fix bulkhead race condition
Previous implementation made it possible for permit to be acquired, but client code getting `BulkheadFullException` resulting in permit leak.
1 parent 9ab1ea2 commit e8e62d2

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

core/src/main/java/dev/failsafe/internal/BulkheadExecutor.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import dev.failsafe.spi.Scheduler;
2525

2626
import java.time.Duration;
27+
import java.util.concurrent.CancellationException;
2728
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.Future;
2930
import java.util.concurrent.TimeUnit;
@@ -62,22 +63,33 @@ protected CompletableFuture<ExecutionResult<R>> preExecuteAsync(Scheduler schedu
6263
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
6364
CompletableFuture<Void> acquireFuture = bulkhead.acquirePermitAsync();
6465
acquireFuture.whenComplete((result, error) -> {
65-
// Signal for execution to proceed
66-
promise.complete(ExecutionResult.none());
66+
if (error instanceof CancellationException) {
67+
// Cancellation of acquireFuture future means either cancellation in the scheduler (in which case we probably
68+
// do not care about the result too much), or cancellation because we reached maxWaitTime (see below) - in which case
69+
// we want to inform the user with BulkheadFullException.
70+
promise.complete(ExecutionResult.exception(new BulkheadFullException(bulkhead)));
71+
} else {
72+
// Signal for execution to proceed
73+
promise.complete(ExecutionResult.none());
74+
}
6775
});
6876

6977
if (!promise.isDone()) {
7078
try {
7179
// Schedule bulkhead permit timeout
7280
Future<?> timeoutFuture = scheduler.schedule(() -> {
73-
promise.complete(ExecutionResult.exception(new BulkheadFullException(bulkhead)));
81+
// Note: we cannot call `promise.complete` here directly. Doing so would result in a following race condition:
82+
// * `promise` would be considered failed (i.e. caller would think thar no permit was acquired)
83+
// * but some other thread may release permit before we call `acquireFuture.cancel` - resulting in `acquireFuture` being completed
84+
// successfully (and permit acquired). But since `promise` is already completed that fact is ignored.
85+
// This discrepancy would lead to permits 'leaking'. So instead we make `acquireFuture.whenComplete` be the only way
86+
// to complete `promise` and here we just signal to that code that `promise` should return BulkheadFullException.
7487
acquireFuture.cancel(true);
7588
return null;
7689
}, maxWaitTime.toNanos(), TimeUnit.NANOSECONDS);
7790

7891
// Propagate outer cancellations to the promise, bulkhead acquire future, and timeout future
7992
future.setCancelFn(this, (mayInterrupt, cancelResult) -> {
80-
promise.complete(cancelResult);
8193
acquireFuture.cancel(mayInterrupt);
8294
timeoutFuture.cancel(mayInterrupt);
8395
});

core/src/test/java/dev/failsafe/functional/BulkheadTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,14 @@
2525
import org.testng.annotations.Test;
2626

2727
import java.time.Duration;
28+
import java.util.ArrayList;
29+
import java.util.List;
2830
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CompletionException;
2932
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
35+
import static org.testng.Assert.assertTrue;
3036

3137
/**
3238
* Tests various Bulkhead scenarios.
@@ -96,4 +102,46 @@ public void testMaxWaitTimeExceeded() {
96102
testRunFailure(Failsafe.with(bulkhead), ctx -> {
97103
}, BulkheadFullException.class);
98104
}
105+
106+
@Test
107+
public void testPermitsLeak() throws InterruptedException {
108+
// We verify against leak of permits because of a race condition that only happens when maxWaitTime is not zero.
109+
Bulkhead<Object> bulkhead = Bulkhead.builder(1).withMaxWaitTime(Duration.ofMillis(1)).build();
110+
FailsafeExecutor<Object> failsafe = Failsafe.with(bulkhead);
111+
112+
AtomicInteger errors = new AtomicInteger();
113+
List<Thread> threads = new ArrayList<>();
114+
for (int i = 0; i < 30; i++) {
115+
threads.add(new Thread(() -> {
116+
for (int j = 0; j < 30; j++) {
117+
try {
118+
failsafe.getStageAsync(() -> {
119+
try {
120+
Thread.sleep(10);
121+
} catch (InterruptedException e) {
122+
log(getClass(), "Interrupted sleep", e);
123+
}
124+
return null;
125+
}).join(); // Submit work to the bulkhead
126+
} catch (CompletionException e) {
127+
errors.incrementAndGet();
128+
}
129+
}
130+
}));
131+
}
132+
133+
// Start and join the threads
134+
threads.forEach(Thread::start);
135+
for (Thread t : threads) {
136+
t.join();
137+
}
138+
139+
// Wait for the executor to finish all work. 250ms is plenty of time to finish the work submitted above.
140+
Thread.sleep(250);
141+
142+
// Make sure this run doesn't fail
143+
failsafe.getStageAsync(() -> null).join();
144+
145+
assertTrue(errors.get() > 0, "Should have some errors because maxWaitTime is very small");
146+
}
99147
}

0 commit comments

Comments
 (0)