Skip to content

Commit 7569ff2

Browse files
authored
Fix deadlock and another bug in TaskQueue (#9846)
* fix deadlock * fix compile
1 parent 3bec846 commit 7569ff2

File tree

4 files changed

+38
-9
lines changed

4 files changed

+38
-9
lines changed

infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/LimitedTaskQueue.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package tech.pegasys.teku.infrastructure.async;
1515

16+
import com.google.common.annotations.VisibleForTesting;
1617
import java.util.concurrent.RejectedExecutionException;
1718
import java.util.function.Supplier;
1819
import org.hyperledger.besu.plugin.services.MetricsSystem;
@@ -61,21 +62,24 @@ private LimitedTaskQueue(final TaskQueue delegate, final int maximumQueueSize) {
6162
}
6263

6364
@Override
64-
public synchronized <T> SafeFuture<T> queueTask(final Supplier<SafeFuture<T>> request) {
65-
if (delegate.getQueuedTasksCount() >= maximumQueueSize) {
66-
rejectedTaskCount++;
67-
return SafeFuture.failedFuture(new QueueIsFullException());
65+
public <T> SafeFuture<T> queueTask(final Supplier<SafeFuture<T>> request) {
66+
synchronized (delegate) {
67+
if (delegate.getQueuedTasksCount() >= maximumQueueSize) {
68+
rejectedTaskCount++;
69+
return SafeFuture.failedFuture(new QueueIsFullException());
70+
}
71+
return delegate.queueTask(request);
6872
}
69-
return delegate.queueTask(request);
7073
}
7174

7275
@Override
73-
public synchronized int getQueuedTasksCount() {
76+
public int getQueuedTasksCount() {
7477
return delegate.getQueuedTasksCount();
7578
}
7679

80+
@VisibleForTesting
7781
@Override
78-
public synchronized int getInflightTaskCount() {
82+
public int getInflightTaskCount() {
7983
return delegate.getInflightTaskCount();
8084
}
8185
}

infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/TaskQueue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public interface TaskQueue {
2121

2222
int getQueuedTasksCount();
2323

24+
/** This must only be used for testing to verify that throttling is working as expected. */
2425
@VisibleForTesting
2526
int getInflightTaskCount();
2627
}

infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ public <T> SafeFuture<T> queueTask(final Supplier<SafeFuture<T>> request) {
6262
protected <T> Runnable getTaskToQueue(
6363
final Supplier<SafeFuture<T>> request, final SafeFuture<T> target) {
6464
return () -> {
65-
final SafeFuture<T> requestFuture = request.get();
65+
final SafeFuture<T> requestFuture;
66+
try {
67+
requestFuture = request.get();
68+
} catch (final Exception e) {
69+
target.completeExceptionally(e);
70+
taskComplete();
71+
return;
72+
}
6673
requestFuture.propagateTo(target);
6774
requestFuture.always(this::taskComplete);
6875
};
@@ -86,7 +93,7 @@ public int getQueuedTasksCount() {
8693

8794
@VisibleForTesting
8895
@Override
89-
public int getInflightTaskCount() {
96+
public synchronized int getInflightTaskCount() {
9097
return inflightTaskCount;
9198
}
9299

infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package tech.pegasys.teku.infrastructure.async;
1515

1616
import static org.assertj.core.api.Assertions.assertThat;
17+
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture;
1718

1819
import java.util.List;
1920
import java.util.concurrent.CompletableFuture;
@@ -78,6 +79,22 @@ public void throttlesRequests() {
7879
checkQueueProgress(requests, 0, 0, 10);
7980
}
8081

82+
@Test
83+
public void shouldFailTaskIfSupplierThrows() {
84+
taskQueue = createThrottlingTaskQueue();
85+
86+
final RuntimeException error = new RuntimeException("Test exception");
87+
88+
final SafeFuture<Void> request =
89+
taskQueue.queueTask(
90+
() -> {
91+
throw error;
92+
});
93+
94+
assertThatSafeFuture(request).isCompletedExceptionallyWith(error);
95+
checkQueueProgress(List.of(request), 0, 0, 1);
96+
}
97+
8198
protected void checkQueueProgress(
8299
final List<SafeFuture<Void>> requests,
83100
final int queueSize,

0 commit comments

Comments
 (0)