Skip to content

Commit 98c1ddf

Browse files
authored
Limit rpc storage query queue size (#9828)
1 parent 070b4b6 commit 98c1ddf

File tree

12 files changed

+330
-21
lines changed

12 files changed

+330
-21
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright Consensys Software Inc., 2025
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package tech.pegasys.teku.infrastructure.async;
15+
16+
import java.util.concurrent.RejectedExecutionException;
17+
import java.util.function.Supplier;
18+
import org.hyperledger.besu.plugin.services.MetricsSystem;
19+
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
20+
21+
/**
22+
* A TaskQueue that limits the number of queued tasks. If the limit is reached, new tasks will be
23+
* rejected. This is useful to prevent unbounded memory usage when tasks are being added faster than
24+
* they can be processed. Passing a {@link ThrottlingTaskQueue} as the delegate will also limit the
25+
* number of concurrently executing tasks.
26+
*/
27+
public class LimitedTaskQueue implements TaskQueue {
28+
private final TaskQueue delegate;
29+
private final int maximumQueueSize;
30+
private volatile long rejectedTaskCount = 0;
31+
32+
public static class QueueIsFullException extends RejectedExecutionException {
33+
public QueueIsFullException() {
34+
super("Task queue is full");
35+
}
36+
}
37+
38+
public static boolean isQueueIsFullException(final Throwable error) {
39+
return error instanceof QueueIsFullException
40+
|| (error.getCause() != null && isQueueIsFullException(error.getCause()));
41+
}
42+
43+
public static LimitedTaskQueue create(
44+
final TaskQueue delegate,
45+
final int maximumQueueSize,
46+
final MetricsSystem metricsSystem,
47+
final TekuMetricCategory metricCategory,
48+
final String metricName) {
49+
final LimitedTaskQueue limitedQueue = new LimitedTaskQueue(delegate, maximumQueueSize);
50+
metricsSystem.createLongGauge(
51+
metricCategory,
52+
metricName,
53+
"Number of tasks rejected by the queue",
54+
() -> limitedQueue.rejectedTaskCount);
55+
return limitedQueue;
56+
}
57+
58+
private LimitedTaskQueue(final TaskQueue delegate, final int maximumQueueSize) {
59+
this.delegate = delegate;
60+
this.maximumQueueSize = maximumQueueSize;
61+
}
62+
63+
@Override
64+
public synchronized <T> SafeFuture<T> queueTask(final Supplier<SafeFuture<T>> request) {
65+
if (delegate.getQueuedTasksCount() >= maximumQueueSize) {
66+
rejectedTaskCount++;
67+
return SafeFuture.failedFuture(new QueueIsFullException());
68+
}
69+
return delegate.queueTask(request);
70+
}
71+
72+
@Override
73+
public synchronized int getQueuedTasksCount() {
74+
return delegate.getQueuedTasksCount();
75+
}
76+
77+
@Override
78+
public synchronized int getInflightTaskCount() {
79+
return delegate.getInflightTaskCount();
80+
}
81+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Consensys Software Inc., 2025
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package tech.pegasys.teku.infrastructure.async;
15+
16+
import com.google.common.annotations.VisibleForTesting;
17+
import java.util.function.Supplier;
18+
19+
public interface TaskQueue {
20+
<T> SafeFuture<T> queueTask(Supplier<SafeFuture<T>> request);
21+
22+
int getQueuedTasksCount();
23+
24+
@VisibleForTesting
25+
int getInflightTaskCount();
26+
}

infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueue.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.hyperledger.besu.plugin.services.MetricsSystem;
2323
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
2424

25-
public class ThrottlingTaskQueue {
25+
public class ThrottlingTaskQueue implements TaskQueue {
2626
private static final Logger LOG = LogManager.getLogger();
2727
protected final Queue<Runnable> queuedTasks = new ConcurrentLinkedQueue<>();
2828

@@ -50,6 +50,7 @@ protected ThrottlingTaskQueue(final int maximumConcurrentTasks) {
5050
LOG.debug("Task queue maximum concurrent tasks {}", maximumConcurrentTasks);
5151
}
5252

53+
@Override
5354
public <T> SafeFuture<T> queueTask(final Supplier<SafeFuture<T>> request) {
5455
final SafeFuture<T> target = new SafeFuture<>();
5556
final Runnable taskToQueue = getTaskToQueue(request, target);
@@ -78,12 +79,14 @@ protected synchronized void processQueuedTasks() {
7879
}
7980
}
8081

81-
protected int getQueuedTasksCount() {
82+
@Override
83+
public int getQueuedTasksCount() {
8284
return queuedTasks.size();
8385
}
8486

8587
@VisibleForTesting
86-
int getInflightTaskCount() {
88+
@Override
89+
public int getInflightTaskCount() {
8790
return inflightTaskCount;
8891
}
8992

infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueWithPriority.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ protected Runnable getTaskToRun() {
6363
}
6464

6565
@Override
66-
protected int getQueuedTasksCount() {
66+
public int getQueuedTasksCount() {
6767
return queuedTasks.size() + queuedPrioritizedTasks.size();
6868
}
6969
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright Consensys Software Inc., 2025
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package tech.pegasys.teku.infrastructure.async;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
18+
import java.util.List;
19+
import java.util.stream.IntStream;
20+
import org.apache.logging.log4j.LogManager;
21+
import org.apache.logging.log4j.Logger;
22+
import org.junit.jupiter.api.Test;
23+
import tech.pegasys.teku.infrastructure.async.LimitedTaskQueue.QueueIsFullException;
24+
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
25+
26+
public class LimitedThrottlingTaskQueueTest extends ThrottlingTaskQueueTest {
27+
private static final Logger LOG = LogManager.getLogger();
28+
29+
@Override
30+
protected TaskQueue createThrottlingTaskQueue() {
31+
return LimitedTaskQueue.create(
32+
super.createThrottlingTaskQueue(),
33+
15,
34+
stubMetricsSystem,
35+
TekuMetricCategory.BEACON,
36+
"test_rejected_metric");
37+
}
38+
39+
@Test
40+
public void rejectsWhenFull() {
41+
taskQueue = createThrottlingTaskQueue();
42+
43+
final int totalTasks = 20;
44+
final int maxQueueSize = 15;
45+
final int expectedRejected = totalTasks - maxQueueSize - MAXIMUM_CONCURRENT_TASKS;
46+
final int[] rejectedCount = {0};
47+
final List<SafeFuture<Void>> requests =
48+
IntStream.range(0, totalTasks)
49+
.mapToObj(
50+
element ->
51+
taskQueue
52+
.queueTask(
53+
() ->
54+
stubAsyncRunner.runAsync(
55+
() -> {
56+
LOG.info("Running task {}", element);
57+
assertThat(taskQueue.getInflightTaskCount())
58+
.isLessThanOrEqualTo(MAXIMUM_CONCURRENT_TASKS);
59+
}))
60+
.exceptionally(
61+
err -> {
62+
LOG.info("Task {} was rejected", element);
63+
assertThat(err).isInstanceOf(QueueIsFullException.class);
64+
rejectedCount[0]++;
65+
return null;
66+
}))
67+
.toList();
68+
69+
// stubRunner will run whatever is active
70+
stubAsyncRunner.executeQueuedActions();
71+
assertThat(rejectedCount[0]).isEqualTo(expectedRejected);
72+
assertThat(
73+
stubMetricsSystem
74+
.getGauge(TekuMetricCategory.BEACON, "test_rejected_metric")
75+
.getValue())
76+
.isEqualTo(expectedRejected);
77+
checkQueueProgress(
78+
requests,
79+
maxQueueSize - MAXIMUM_CONCURRENT_TASKS,
80+
MAXIMUM_CONCURRENT_TASKS,
81+
MAXIMUM_CONCURRENT_TASKS + expectedRejected);
82+
}
83+
}

infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/ThrottlingTaskQueueTest.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,23 @@
2727
public class ThrottlingTaskQueueTest {
2828

2929
private static final Logger LOG = LogManager.getLogger();
30-
private static final int MAXIMUM_CONCURRENT_TASKS = 3;
30+
protected static final int MAXIMUM_CONCURRENT_TASKS = 3;
3131

32-
private final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem();
32+
protected final StubMetricsSystem stubMetricsSystem = new StubMetricsSystem();
3333

34-
private final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner();
34+
protected final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner();
3535

36-
private final ThrottlingTaskQueue taskQueue =
37-
ThrottlingTaskQueue.create(
38-
MAXIMUM_CONCURRENT_TASKS, stubMetricsSystem, TekuMetricCategory.BEACON, "test_metric");
36+
protected static final String METRIC_NAME = "test_metric";
37+
protected TaskQueue taskQueue;
38+
39+
protected TaskQueue createThrottlingTaskQueue() {
40+
return ThrottlingTaskQueue.create(
41+
MAXIMUM_CONCURRENT_TASKS, stubMetricsSystem, TekuMetricCategory.BEACON, METRIC_NAME);
42+
}
3943

4044
@Test
4145
public void throttlesRequests() {
46+
taskQueue = createThrottlingTaskQueue();
4247
// queue tasks to run, they shouldn't start straight away.
4348
final List<SafeFuture<Void>> requests =
4449
IntStream.range(0, 10)
@@ -73,7 +78,7 @@ public void throttlesRequests() {
7378
checkQueueProgress(requests, 0, 0, 10);
7479
}
7580

76-
private void checkQueueProgress(
81+
protected void checkQueueProgress(
7782
final List<SafeFuture<Void>> requests,
7883
final int queueSize,
7984
final int inFlight,

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/P2PConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class P2PConfig {
5757
// To avoid resource saturation and ensure capacity for other tasks, we limit historical data
5858
// queries to 5
5959
public static final int DEFAULT_HISTORICAL_DATA_MAX_CONCURRENT_QUERIES = 5;
60+
public static final int DEFAULT_HISTORICAL_MAX_QUERY_QUEUE_SIZE = 100_000;
6061

6162
private final Spec spec;
6263
private final NetworkConfig networkConfig;
@@ -69,6 +70,7 @@ public class P2PConfig {
6970
private final boolean subscribeAllSubnetsEnabled;
7071
private final int dasExtraCustodyGroupCount;
7172
private final int historicalDataMaxConcurrentQueries;
73+
private final int historicalDataMaxQueryQueueSize;
7274
private final int peerBlocksRateLimit;
7375
private final int peerBlobSidecarsRateLimit;
7476
private final int peerRequestLimit;
@@ -89,6 +91,7 @@ private P2PConfig(
8991
final boolean subscribeAllSubnetsEnabled,
9092
final int dasExtraCustodyGroupCount,
9193
final int historicalDataMaxConcurrentQueries,
94+
final int historicalDataMaxQueryQueueSize,
9295
final int peerBlocksRateLimit,
9396
final int peerBlobSidecarsRateLimit,
9497
final int peerRequestLimit,
@@ -107,6 +110,7 @@ private P2PConfig(
107110
this.subscribeAllSubnetsEnabled = subscribeAllSubnetsEnabled;
108111
this.dasExtraCustodyGroupCount = dasExtraCustodyGroupCount;
109112
this.historicalDataMaxConcurrentQueries = historicalDataMaxConcurrentQueries;
113+
this.historicalDataMaxQueryQueueSize = historicalDataMaxQueryQueueSize;
110114
this.peerBlocksRateLimit = peerBlocksRateLimit;
111115
this.peerBlobSidecarsRateLimit = peerBlobSidecarsRateLimit;
112116
this.peerRequestLimit = peerRequestLimit;
@@ -164,6 +168,10 @@ public int getHistoricalDataMaxConcurrentQueries() {
164168
return historicalDataMaxConcurrentQueries;
165169
}
166170

171+
public int getHistoricalDataMaxQueryQueueSize() {
172+
return historicalDataMaxQueryQueueSize;
173+
}
174+
167175
public int getPeerBlocksRateLimit() {
168176
return peerBlocksRateLimit;
169177
}
@@ -216,6 +224,7 @@ public static class Builder {
216224
private Boolean subscribeAllCustodySubnetsEnabled = DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED;
217225
private int dasExtraCustodyGroupCount = DEFAULT_DAS_EXTRA_CUSTODY_GROUP_COUNT;
218226
private int historicalDataMaxConcurrentQueries = DEFAULT_HISTORICAL_DATA_MAX_CONCURRENT_QUERIES;
227+
private int historicalDataMaxQueryQueueSize = DEFAULT_HISTORICAL_MAX_QUERY_QUEUE_SIZE;
219228
private Integer peerBlocksRateLimit = DEFAULT_PEER_BLOCKS_RATE_LIMIT;
220229
private Integer peerBlobSidecarsRateLimit = DEFAULT_PEER_BLOB_SIDECARS_RATE_LIMIT;
221230
private Integer peerRequestLimit = DEFAULT_PEER_REQUEST_LIMIT;
@@ -274,6 +283,7 @@ public P2PConfig build() {
274283
subscribeAllSubnetsEnabled,
275284
dasExtraCustodyGroupCount,
276285
historicalDataMaxConcurrentQueries,
286+
historicalDataMaxQueryQueueSize,
277287
peerBlocksRateLimit,
278288
peerBlobSidecarsRateLimit,
279289
peerRequestLimit,
@@ -338,6 +348,11 @@ public Builder historicalDataMaxConcurrentQueries(
338348
return this;
339349
}
340350

351+
public Builder historicalDataMaxQueryQueueSize(final int historicalDataMaxQueryQueueSize) {
352+
this.historicalDataMaxQueryQueueSize = historicalDataMaxQueryQueueSize;
353+
return this;
354+
}
355+
341356
public Builder subscribeAllCustodySubnetsEnabled(
342357
final Boolean subscribeAllCustodySubnetsEnabled) {
343358
checkNotNull(subscribeAllCustodySubnetsEnabled);

networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/DataColumnSidecarsByRootMessageHandler.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import tech.pegasys.teku.statetransition.datacolumns.log.rpc.DasReqRespLogger;
5151
import tech.pegasys.teku.statetransition.datacolumns.log.rpc.LoggingPeerId;
5252
import tech.pegasys.teku.statetransition.datacolumns.log.rpc.ReqRespResponseLogger;
53+
import tech.pegasys.teku.storage.api.ThrottlingStorageQueryChannel;
5354
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
5455

5556
/**
@@ -228,10 +229,12 @@ private SafeFuture<Void> validateMinimumRequestEpoch(
228229
final DataColumnIdentifier identifier, final Optional<DataColumnSidecar> maybeSidecar) {
229230
return maybeSidecar
230231
.map(sidecar -> SafeFuture.completedFuture(Optional.of(sidecar.getSlot())))
231-
.orElse(
232-
combinedChainDataClient
233-
.getBlockByBlockRoot(identifier.blockRoot())
234-
.thenApply(maybeBlock -> maybeBlock.map(SignedBeaconBlock::getSlot)))
232+
.orElseGet(
233+
() ->
234+
combinedChainDataClient
235+
.getBlockByBlockRoot(identifier.blockRoot())
236+
.exceptionally(ThrottlingStorageQueryChannel::ignoreQueueIsFullException)
237+
.thenApply(maybeBlock -> maybeBlock.map(SignedBeaconBlock::getSlot)))
235238
.thenAcceptChecked(
236239
maybeSlot -> {
237240
if (maybeSlot.isEmpty()) {
@@ -261,7 +264,8 @@ private SafeFuture<Optional<DataColumnSidecar>> retrieveDataColumnSidecar(
261264
}
262265
// Fallback to non-canonical sidecar if the canonical one is not found
263266
return getNonCanonicalDataColumnSidecar(identifier);
264-
});
267+
})
268+
.exceptionally(ThrottlingStorageQueryChannel::ignoreQueueIsFullException);
265269
}
266270

267271
private void handleError(

0 commit comments

Comments
 (0)