Skip to content

Commit 551f4c7

Browse files
authored
Limit concurrent node requests (elastic#122850) (elastic#127742)
(cherry picked from commit 8d11dd2)
1 parent 6b4f024 commit 551f4c7

File tree

6 files changed

+228
-37
lines changed

6 files changed

+228
-37
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @see #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
3131
*/
3232
public final class ExchangeSourceHandler {
33+
3334
private final ExchangeBuffer buffer;
3435
private final Executor fetchExecutor;
3536

@@ -56,6 +57,10 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
5657
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.noop()));
5758
}
5859

60+
public boolean isFinished() {
61+
return buffer.isFinished();
62+
}
63+
5964
private void checkFailure() {
6065
if (aborted) {
6166
throw new TaskCancelledException("remote sinks failed");

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@
1010
import org.apache.lucene.tests.util.LuceneTestCase;
1111
import org.elasticsearch.ExceptionsHelper;
1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
1314
import org.elasticsearch.action.bulk.BulkRequestBuilder;
14-
import org.elasticsearch.action.index.IndexRequest;
1515
import org.elasticsearch.action.support.WriteRequest;
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
1717
import org.elasticsearch.cluster.node.DiscoveryNode;
1818
import org.elasticsearch.common.settings.Settings;
1919
import org.elasticsearch.common.util.CollectionUtils;
2020
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2121
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
22+
import org.elasticsearch.common.util.iterable.Iterables;
2223
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2324
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.plugins.Plugin;
@@ -39,13 +40,15 @@
3940
import java.util.Collection;
4041
import java.util.List;
4142
import java.util.Map;
43+
import java.util.Objects;
4244
import java.util.concurrent.CountDownLatch;
4345
import java.util.concurrent.TimeUnit;
4446
import java.util.concurrent.atomic.AtomicInteger;
4547
import java.util.concurrent.atomic.AtomicReference;
4648

4749
import static org.hamcrest.Matchers.equalTo;
4850
import static org.hamcrest.Matchers.instanceOf;
51+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4952

5053
/**
5154
* Make sures that we can run many concurrent requests with large number of shards with any data_partitioning.
@@ -91,11 +94,11 @@ public void setupIndices() {
9194
.setMapping("user", "type=keyword", "tags", "type=keyword")
9295
.get();
9396
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
94-
int numDocs = between(5, 10);
97+
int numDocs = between(10, 25); // every shard has at least 1 doc
9598
for (int d = 0; d < numDocs; d++) {
9699
String user = randomFrom("u1", "u2", "u3");
97100
String tag = randomFrom("java", "elasticsearch", "lucene");
98-
bulk.add(new IndexRequest().source(Map.of("user", user, "tags", tag)));
101+
bulk.add(client().prepareIndex().setSource(Map.of("user", user, "tags", tag)));
99102
}
100103
bulk.get();
101104
}
@@ -257,4 +260,43 @@ public void testLimitConcurrentShards() {
257260
}
258261
}
259262
}
263+
264+
public void testCancelUnnecessaryRequests() {
265+
assumeTrue("Requires pragmas", canUseQueryPragmas());
266+
internalCluster().ensureAtLeastNumDataNodes(3);
267+
268+
var coordinatingNode = internalCluster().getNodeNames()[0];
269+
270+
var exchanges = new AtomicInteger(0);
271+
var coordinatorNodeTransport = MockTransportService.getInstance(coordinatingNode);
272+
coordinatorNodeTransport.addSendBehavior((connection, requestId, action, request, options) -> {
273+
if (Objects.equals(action, ExchangeService.OPEN_EXCHANGE_ACTION_NAME)) {
274+
logger.info("Opening exchange on node [{}]", connection.getNode().getId());
275+
exchanges.incrementAndGet();
276+
}
277+
connection.sendRequest(requestId, action, request, options);
278+
});
279+
280+
var query = EsqlQueryRequest.syncEsqlQueryRequest();
281+
query.query("from test-* | LIMIT 1");
282+
query.pragmas(new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_NODES_PER_CLUSTER.getKey(), 1).build()));
283+
284+
try {
285+
var result = safeExecute(client(coordinatingNode), EsqlQueryAction.INSTANCE, query);
286+
assertThat(Iterables.size(result.rows()), equalTo(1L));
287+
assertThat(exchanges.get(), lessThanOrEqualTo(1));// 0 if result is populated from coordinating node
288+
} catch (AssertionError e) {
289+
client().admin().indices().stats(new IndicesStatsRequest()).actionGet().asMap().forEach((shard, stats) -> {
290+
logger.info(
291+
"Shard {} node {} status {} docs {}",
292+
shard.shardId(),
293+
shard.currentNodeId(),
294+
shard.state(),
295+
stats.getStats().getDocs().getCount()
296+
);
297+
});
298+
} finally {
299+
coordinatorNodeTransport.clearAllRules();
300+
}
301+
}
260302
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ void startComputeOnDataNodes(
9898
Runnable runOnTaskFailure,
9999
ActionListener<ComputeResponse> outListener
100100
) {
101-
final boolean allowPartialResults = configuration.allowPartialResults();
102-
DataNodeRequestSender sender = new DataNodeRequestSender(
101+
new DataNodeRequestSender(
103102
transportService,
104103
esqlExecutor,
105104
clusterAlias,
106105
parentTask,
107-
allowPartialResults
106+
configuration.allowPartialResults(),
107+
configuration.pragmas().maxConcurrentNodesPerCluster()
108108
) {
109109
@Override
110110
protected void sendRequest(
@@ -113,6 +113,11 @@ protected void sendRequest(
113113
Map<Index, AliasFilter> aliasFilters,
114114
NodeListener nodeListener
115115
) {
116+
if (exchangeSource.isFinished()) {
117+
nodeListener.onSkip();
118+
return;
119+
}
120+
116121
final AtomicLong pagesFetched = new AtomicLong();
117122
var listener = ActionListener.wrap(nodeListener::onResponse, e -> nodeListener.onFailure(e, pagesFetched.get() > 0));
118123
final Transport.Connection connection;
@@ -135,7 +140,7 @@ protected void sendRequest(
135140
listener.delegateFailureAndWrap((l, unused) -> {
136141
final Runnable onGroupFailure;
137142
final CancellableTask groupTask;
138-
if (allowPartialResults) {
143+
if (configuration.allowPartialResults()) {
139144
try {
140145
groupTask = computeService.createGroupTask(
141146
parentTask,
@@ -158,7 +163,7 @@ protected void sendRequest(
158163
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
159164
exchangeSource.addRemoteSink(
160165
remoteSink,
161-
allowPartialResults == false,
166+
configuration.allowPartialResults() == false,
162167
pagesFetched::incrementAndGet,
163168
queryPragmas.concurrentExchangeClients(),
164169
computeListener.acquireAvoid()
@@ -190,8 +195,7 @@ protected void sendRequest(
190195
})
191196
);
192197
}
193-
};
194-
sender.startComputeOnDataNodes(
198+
}.startComputeOnDataNodes(
195199
clusterAlias,
196200
concreteIndices,
197201
originalIndices,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.Executor;
4949
import java.util.concurrent.Semaphore;
5050
import java.util.concurrent.atomic.AtomicBoolean;
51+
import java.util.concurrent.atomic.AtomicInteger;
5152
import java.util.concurrent.locks.ReentrantLock;
5253

5354
/**
@@ -60,10 +61,12 @@ abstract class DataNodeRequestSender {
6061
private final String clusterAlias;
6162
private final CancellableTask rootTask;
6263
private final boolean allowPartialResults;
64+
private final Semaphore concurrentRequests;
6365
private final ReentrantLock sendingLock = new ReentrantLock();
6466
private final Queue<ShardId> pendingShardIds = ConcurrentCollections.newQueue();
6567
private final Map<DiscoveryNode, Semaphore> nodePermits = new HashMap<>();
6668
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
69+
private final AtomicInteger skippedShards = new AtomicInteger();
6770
private final AtomicBoolean changed = new AtomicBoolean();
6871
private boolean reportedFailure = false; // guarded by sendingLock
6972

@@ -72,13 +75,15 @@ abstract class DataNodeRequestSender {
7275
Executor esqlExecutor,
7376
String clusterAlias,
7477
CancellableTask rootTask,
75-
boolean allowPartialResults
78+
boolean allowPartialResults,
79+
int concurrentRequests
7680
) {
7781
this.transportService = transportService;
7882
this.esqlExecutor = esqlExecutor;
7983
this.clusterAlias = clusterAlias;
8084
this.rootTask = rootTask;
8185
this.allowPartialResults = allowPartialResults;
86+
this.concurrentRequests = concurrentRequests > 0 ? new Semaphore(concurrentRequests) : null;
8287
}
8388

8489
final void startComputeOnDataNodes(
@@ -96,8 +101,8 @@ final void startComputeOnDataNodes(
96101
profiles,
97102
TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos),
98103
targetShards.totalShards(),
99-
targetShards.totalShards() - shardFailures.size(),
100-
targetShards.skippedShards(),
104+
targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
105+
targetShards.skippedShards() + skippedShards.get(),
101106
shardFailures.size(),
102107
selectFailures()
103108
);
@@ -191,6 +196,9 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
191196
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
192197
void onAfter(List<DriverProfile> profiles) {
193198
nodePermits.get(request.node).release();
199+
if (concurrentRequests != null) {
200+
concurrentRequests.release();
201+
}
194202
trySendingRequestsForPendingShards(targetShards, computeListener);
195203
listener.onResponse(profiles);
196204
}
@@ -219,6 +227,16 @@ public void onFailure(Exception e, boolean receivedData) {
219227
}
220228
onAfter(List.of());
221229
}
230+
231+
@Override
232+
public void onSkip() {
233+
skippedShards.incrementAndGet();
234+
if (rootTask.isCancelled()) {
235+
onFailure(new TaskCancelledException("null"), true);
236+
} else {
237+
onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
238+
}
239+
}
222240
});
223241
}
224242

@@ -228,6 +246,8 @@ interface NodeListener {
228246
void onResponse(DataNodeComputeResponse response);
229247

230248
void onFailure(Exception e, boolean receivedData);
249+
250+
void onSkip();
231251
}
232252

233253
private static Exception unwrapFailure(Exception e) {
@@ -288,6 +308,7 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
288308
assert sendingLock.isHeldByCurrentThread();
289309
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new HashMap<>();
290310
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
311+
291312
while (shardsIt.hasNext()) {
292313
ShardId shardId = shardsIt.next();
293314
ShardFailure failure = shardFailures.get(shardId);
@@ -297,31 +318,45 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
297318
}
298319
TargetShard shard = targetShards.getShard(shardId);
299320
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
300-
DiscoveryNode selectedNode = null;
301321
while (nodesIt.hasNext()) {
302322
DiscoveryNode node = nodesIt.next();
303-
if (nodeToShardIds.containsKey(node) || nodePermits.get(node).tryAcquire()) {
323+
List<ShardId> pendingRequest = nodeToShardIds.get(node);
324+
if (pendingRequest != null) {
325+
pendingRequest.add(shard.shardId);
304326
nodesIt.remove();
305327
shardsIt.remove();
306-
selectedNode = node;
307328
break;
308329
}
309-
}
310-
if (selectedNode != null) {
311-
nodeToShardIds.computeIfAbsent(selectedNode, unused -> new ArrayList<>()).add(shard.shardId);
330+
331+
if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
332+
if (nodePermits.get(node).tryAcquire()) {
333+
pendingRequest = new ArrayList<>();
334+
pendingRequest.add(shard.shardId);
335+
nodeToShardIds.put(node, pendingRequest);
336+
337+
nodesIt.remove();
338+
shardsIt.remove();
339+
340+
break;
341+
} else if (concurrentRequests != null) {
342+
concurrentRequests.release();
343+
}
344+
}
312345
}
313346
}
347+
314348
final List<NodeRequest> nodeRequests = new ArrayList<>(nodeToShardIds.size());
315-
for (var e : nodeToShardIds.entrySet()) {
316-
List<ShardId> shardIds = e.getValue();
349+
for (var entry : nodeToShardIds.entrySet()) {
350+
var node = entry.getKey();
351+
var shardIds = entry.getValue();
317352
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
318353
for (ShardId shardId : shardIds) {
319354
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
320355
if (aliasFilter != null) {
321356
aliasFilters.put(shardId.getIndex(), aliasFilter);
322357
}
323358
}
324-
nodeRequests.add(new NodeRequest(e.getKey(), shardIds, aliasFilters));
359+
nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters));
325360
}
326361
return nodeRequests;
327362
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public final class QueryPragmas implements Writeable {
5555
*/
5656
public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);
5757

58-
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
58+
public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = //
59+
Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100);
60+
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = //
61+
Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
5962

6063
public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true);
6164

@@ -122,6 +125,13 @@ public int enrichMaxWorkers() {
122125
return ENRICH_MAX_WORKERS.get(settings);
123126
}
124127

128+
/**
129+
* The maximum number of nodes to be queried at once by this query. This is safeguard to avoid overloading the cluster.
130+
*/
131+
public int maxConcurrentNodesPerCluster() {
132+
return MAX_CONCURRENT_NODES_PER_CLUSTER.get(settings);
133+
}
134+
125135
/**
126136
* The maximum number of shards can be executed concurrently on a single node by this query. This is a safeguard to avoid
127137
* opening and holding many shards (equivalent to many file descriptors) or having too many field infos created by a single query.

0 commit comments

Comments
 (0)