Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions core/src/main/java/dev/failsafe/internal/BulkheadExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import dev.failsafe.spi.Scheduler;

import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -62,22 +63,33 @@ protected CompletableFuture<ExecutionResult<R>> preExecuteAsync(Scheduler schedu
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
CompletableFuture<Void> acquireFuture = bulkhead.acquirePermitAsync();
acquireFuture.whenComplete((result, error) -> {
// Signal for execution to proceed
promise.complete(ExecutionResult.none());
if (error instanceof CancellationException) {
// Cancellation of acquireFuture future means either cancellation in the scheduler (in which case we probably
// do not care about the result too much), or cancellation because we reached maxWaitTime (see below) - in which case
// we want to inform the user with BulkheadFullException.
promise.complete(ExecutionResult.exception(new BulkheadFullException(bulkhead)));
} else {
// Signal for execution to proceed
promise.complete(ExecutionResult.none());
}
});

if (!promise.isDone()) {
try {
// Schedule bulkhead permit timeout
Future<?> timeoutFuture = scheduler.schedule(() -> {
promise.complete(ExecutionResult.exception(new BulkheadFullException(bulkhead)));
// Note: we cannot call `promise.complete` here directly. Doing so would result in a following race condition:
// * `promise` would be considered failed (i.e. caller would think thar no permit was acquired)
// * but some other thread may release permit before we call `acquireFuture.cancel` - resulting in `acquireFuture` being completed
// successfully (and permit acquired). But since `promise` is already completed that fact is ignored.
// This discrepancy would lead to permits 'leaking'. So instead we make `acquireFuture.whenComplete` be the only way
// to complete `promise` and here we just signal to that code that `promise` should return BulkheadFullException.
acquireFuture.cancel(true);
return null;
}, maxWaitTime.toNanos(), TimeUnit.NANOSECONDS);

// Propagate outer cancellations to the promise, bulkhead acquire future, and timeout future
future.setCancelFn(this, (mayInterrupt, cancelResult) -> {
promise.complete(cancelResult);
acquireFuture.cancel(mayInterrupt);
timeoutFuture.cancel(mayInterrupt);
});
Expand Down
30 changes: 23 additions & 7 deletions core/src/main/java/dev/failsafe/internal/BulkheadImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,30 @@ synchronized CompletableFuture<Void> acquirePermitAsync() {

@Override
public synchronized void releasePermit() {
if (permits < maxPermits) {
permits += 1;
CompletableFuture<Void> future = futures.pollFirst();
if (future != null){
permits -= 1;
future.complete(null);
if (permits < maxPermits) {
permits += 1;
/*
* It is possible to get future from the list that already had been completed. This
* happens because setting future to 'completed' state happens before (and not
* atomically with) removing future from the list. Handle this by pulling futures from
* the list until we find one we can complete (or reach the end of the list). Not doing
* this may result in 'dandling' messages in the list that are never completed. For some
* details see FutureLinkedList.add - how it returns a future that weill remove entry
* from the list when it is completed. And also see BulkheadExecutor.preExecuteAsync
* that calls acquirePermitAsync and gets that future in response.
*/
while (true) {
CompletableFuture<Void> future = futures.pollFirst();
if (future == null) {
break;
}
permits -= 1;
if (future.complete(null)) {
break;
}
permits += 1;
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ public synchronized CompletableFuture<Void> pollFirst() {
return previousHead == null ? null : previousHead.future;
}

/*
* This looks dodgy: we are 'leaking' reference to the node object via future.whenComplete, so
* this can end up being called for a node that has already been removed. This could have caused
* problems. But in reality it currently would not because the only way to remove a node is via
* pollFirst - i.e. by polling from the head of the list. This means that if node passed to this
* function has already been removed it would imply that it's 'previous' field is always null
* (it was in the head of the list before removal). And it's 'next' points to current head of
* the list, so when we replace node.next.previous with node.previous we always replace null
* with null. This whole assumption would break it this list allowed to add from the head of the
* list, or remove from the tail or middle. So this is somewhat fragile, but currently seems to
* be working fine.
*/
private synchronized void remove(Node node) {
if (node.previous != null)
node.previous.next = node.next;
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/java/dev/failsafe/functional/BulkheadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@
import org.testng.annotations.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.testng.Assert.assertTrue;

/**
* Tests various Bulkhead scenarios.
Expand Down Expand Up @@ -96,4 +102,46 @@ public void testMaxWaitTimeExceeded() {
testRunFailure(Failsafe.with(bulkhead), ctx -> {
}, BulkheadFullException.class);
}

@Test
public void testPermitsLeak() throws InterruptedException {
// We verify against leak of permits because of a race condition that only happens when maxWaitTime is not zero.
Bulkhead<Object> bulkhead = Bulkhead.builder(1).withMaxWaitTime(Duration.ofMillis(1)).build();
FailsafeExecutor<Object> failsafe = Failsafe.with(bulkhead);

AtomicInteger errors = new AtomicInteger();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 30; i++) {
threads.add(new Thread(() -> {
for (int j = 0; j < 30; j++) {
try {
failsafe.getStageAsync(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
log(getClass(), "Interrupted sleep", e);
}
return null;
}).join(); // Submit work to the bulkhead
} catch (CompletionException e) {
errors.incrementAndGet();
}
}
}));
}

// Start and join the threads
threads.forEach(Thread::start);
for (Thread t : threads) {
t.join();
}

// Wait for the executor to finish all work. 250ms is plenty of time to finish the work submitted above.
Thread.sleep(250);

// Make sure this run doesn't fail
failsafe.getStageAsync(() -> null).join();

assertTrue(errors.get() > 0, "Should have some errors because maxWaitTime is very small");
}
}