Skip to content

Commit 8d11dd2

Browse files
authored
Limit concurrent node requests (#122850)
1 parent c41caeb commit 8d11dd2

File tree

6 files changed

+236
-36
lines changed

6 files changed

+236
-36
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @see #addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)
3131
*/
3232
public final class ExchangeSourceHandler {
33+
3334
private final ExchangeBuffer buffer;
3435
private final Executor fetchExecutor;
3536

@@ -56,6 +57,10 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor) {
5657
this.outstandingSources = new PendingInstances(() -> finishEarly(true, ActionListener.noop()));
5758
}
5859

60+
public boolean isFinished() {
61+
return buffer.isFinished();
62+
}
63+
5964
private void checkFailure() {
6065
if (aborted) {
6166
throw new TaskCancelledException("remote sinks failed");

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@
1010
import org.apache.lucene.tests.util.LuceneTestCase;
1111
import org.elasticsearch.ExceptionsHelper;
1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
1314
import org.elasticsearch.action.bulk.BulkRequestBuilder;
14-
import org.elasticsearch.action.index.IndexRequest;
1515
import org.elasticsearch.action.support.WriteRequest;
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;
1717
import org.elasticsearch.cluster.node.DiscoveryNode;
1818
import org.elasticsearch.common.settings.Settings;
1919
import org.elasticsearch.common.util.CollectionUtils;
2020
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2121
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
22+
import org.elasticsearch.common.util.iterable.Iterables;
2223
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2324
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.plugins.Plugin;
@@ -39,13 +40,15 @@
3940
import java.util.Collection;
4041
import java.util.List;
4142
import java.util.Map;
43+
import java.util.Objects;
4244
import java.util.concurrent.CountDownLatch;
4345
import java.util.concurrent.TimeUnit;
4446
import java.util.concurrent.atomic.AtomicInteger;
4547
import java.util.concurrent.atomic.AtomicReference;
4648

4749
import static org.hamcrest.Matchers.equalTo;
4850
import static org.hamcrest.Matchers.instanceOf;
51+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4952

5053
/**
5154
* Make sures that we can run many concurrent requests with large number of shards with any data_partitioning.
@@ -90,11 +93,11 @@ public void setupIndices() {
9093
.setMapping("user", "type=keyword", "tags", "type=keyword")
9194
.get();
9295
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
93-
int numDocs = between(5, 10);
96+
int numDocs = between(10, 25); // every shard has at least 1 doc
9497
for (int d = 0; d < numDocs; d++) {
9598
String user = randomFrom("u1", "u2", "u3");
9699
String tag = randomFrom("java", "elasticsearch", "lucene");
97-
bulk.add(new IndexRequest().source(Map.of("user", user, "tags", tag)));
100+
bulk.add(client().prepareIndex().setSource(Map.of("user", user, "tags", tag)));
98101
}
99102
bulk.get();
100103
}
@@ -256,4 +259,43 @@ public void testLimitConcurrentShards() {
256259
}
257260
}
258261
}
262+
263+
public void testCancelUnnecessaryRequests() {
264+
assumeTrue("Requires pragmas", canUseQueryPragmas());
265+
internalCluster().ensureAtLeastNumDataNodes(3);
266+
267+
var coordinatingNode = internalCluster().getNodeNames()[0];
268+
269+
var exchanges = new AtomicInteger(0);
270+
var coordinatorNodeTransport = MockTransportService.getInstance(coordinatingNode);
271+
coordinatorNodeTransport.addSendBehavior((connection, requestId, action, request, options) -> {
272+
if (Objects.equals(action, ExchangeService.OPEN_EXCHANGE_ACTION_NAME)) {
273+
logger.info("Opening exchange on node [{}]", connection.getNode().getId());
274+
exchanges.incrementAndGet();
275+
}
276+
connection.sendRequest(requestId, action, request, options);
277+
});
278+
279+
var query = EsqlQueryRequest.syncEsqlQueryRequest();
280+
query.query("from test-* | LIMIT 1");
281+
query.pragmas(new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_NODES_PER_CLUSTER.getKey(), 1).build()));
282+
283+
try {
284+
var result = safeExecute(client(coordinatingNode), EsqlQueryAction.INSTANCE, query);
285+
assertThat(Iterables.size(result.rows()), equalTo(1L));
286+
assertThat(exchanges.get(), lessThanOrEqualTo(1));// 0 if result is populated from coordinating node
287+
} catch (AssertionError e) {
288+
client().admin().indices().stats(new IndicesStatsRequest()).actionGet().asMap().forEach((shard, stats) -> {
289+
logger.info(
290+
"Shard {} node {} status {} docs {}",
291+
shard.shardId(),
292+
shard.currentNodeId(),
293+
shard.state(),
294+
stats.getStats().getDocs().getCount()
295+
);
296+
});
297+
} finally {
298+
coordinatorNodeTransport.clearAllRules();
299+
}
300+
}
259301
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,25 @@ void startComputeOnDataNodes(
9898
Runnable runOnTaskFailure,
9999
ActionListener<ComputeResponse> outListener
100100
) {
101-
final boolean allowPartialResults = configuration.allowPartialResults();
102-
DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask, allowPartialResults) {
101+
new DataNodeRequestSender(
102+
transportService,
103+
esqlExecutor,
104+
parentTask,
105+
configuration.allowPartialResults(),
106+
configuration.pragmas().maxConcurrentNodesPerCluster()
107+
) {
103108
@Override
104109
protected void sendRequest(
105110
DiscoveryNode node,
106111
List<ShardId> shardIds,
107112
Map<Index, AliasFilter> aliasFilters,
108113
NodeListener nodeListener
109114
) {
115+
if (exchangeSource.isFinished()) {
116+
nodeListener.onSkip();
117+
return;
118+
}
119+
110120
final AtomicLong pagesFetched = new AtomicLong();
111121
var listener = ActionListener.wrap(nodeListener::onResponse, e -> nodeListener.onFailure(e, pagesFetched.get() > 0));
112122
final Transport.Connection connection;
@@ -129,7 +139,7 @@ protected void sendRequest(
129139
listener.delegateFailureAndWrap((l, unused) -> {
130140
final Runnable onGroupFailure;
131141
final CancellableTask groupTask;
132-
if (allowPartialResults) {
142+
if (configuration.allowPartialResults()) {
133143
try {
134144
groupTask = computeService.createGroupTask(
135145
parentTask,
@@ -152,7 +162,7 @@ protected void sendRequest(
152162
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
153163
exchangeSource.addRemoteSink(
154164
remoteSink,
155-
allowPartialResults == false,
165+
configuration.allowPartialResults() == false,
156166
pagesFetched::incrementAndGet,
157167
queryPragmas.concurrentExchangeClients(),
158168
computeListener.acquireAvoid()
@@ -184,8 +194,7 @@ protected void sendRequest(
184194
})
185195
);
186196
}
187-
};
188-
sender.startComputeOnDataNodes(
197+
}.startComputeOnDataNodes(
189198
clusterAlias,
190199
concreteIndices,
191200
originalIndices,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.Executor;
4747
import java.util.concurrent.Semaphore;
4848
import java.util.concurrent.atomic.AtomicBoolean;
49+
import java.util.concurrent.atomic.AtomicInteger;
4950
import java.util.concurrent.locks.ReentrantLock;
5051

5152
/**
@@ -57,18 +58,27 @@ abstract class DataNodeRequestSender {
5758
private final Executor esqlExecutor;
5859
private final CancellableTask rootTask;
5960
private final boolean allowPartialResults;
61+
private final Semaphore concurrentRequests;
6062
private final ReentrantLock sendingLock = new ReentrantLock();
6163
private final Queue<ShardId> pendingShardIds = ConcurrentCollections.newQueue();
6264
private final Map<DiscoveryNode, Semaphore> nodePermits = new HashMap<>();
6365
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
66+
private final AtomicInteger skippedShards = new AtomicInteger();
6467
private final AtomicBoolean changed = new AtomicBoolean();
6568
private boolean reportedFailure = false; // guarded by sendingLock
6669

67-
DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask, boolean allowPartialResults) {
70+
DataNodeRequestSender(
71+
TransportService transportService,
72+
Executor esqlExecutor,
73+
CancellableTask rootTask,
74+
boolean allowPartialResults,
75+
int concurrentRequests
76+
) {
6877
this.transportService = transportService;
6978
this.esqlExecutor = esqlExecutor;
7079
this.rootTask = rootTask;
7180
this.allowPartialResults = allowPartialResults;
81+
this.concurrentRequests = concurrentRequests > 0 ? new Semaphore(concurrentRequests) : null;
7282
}
7383

7484
final void startComputeOnDataNodes(
@@ -86,8 +96,8 @@ final void startComputeOnDataNodes(
8696
profiles,
8797
TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos),
8898
targetShards.totalShards(),
89-
targetShards.totalShards() - shardFailures.size(),
90-
targetShards.skippedShards(),
99+
targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
100+
targetShards.skippedShards() + skippedShards.get(),
91101
shardFailures.size()
92102
);
93103
}))) {
@@ -159,6 +169,9 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
159169
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
160170
void onAfter(List<DriverProfile> profiles) {
161171
nodePermits.get(request.node).release();
172+
if (concurrentRequests != null) {
173+
concurrentRequests.release();
174+
}
162175
trySendingRequestsForPendingShards(targetShards, computeListener);
163176
listener.onResponse(profiles);
164177
}
@@ -187,6 +200,16 @@ public void onFailure(Exception e, boolean receivedData) {
187200
}
188201
onAfter(List.of());
189202
}
203+
204+
@Override
205+
public void onSkip() {
206+
skippedShards.incrementAndGet();
207+
if (rootTask.isCancelled()) {
208+
onFailure(new TaskCancelledException("null"), true);
209+
} else {
210+
onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
211+
}
212+
}
190213
});
191214
}
192215

@@ -196,6 +219,8 @@ interface NodeListener {
196219
void onResponse(DataNodeComputeResponse response);
197220

198221
void onFailure(Exception e, boolean receivedData);
222+
223+
void onSkip();
199224
}
200225

201226
private static Exception unwrapFailure(Exception e) {
@@ -256,6 +281,7 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
256281
assert sendingLock.isHeldByCurrentThread();
257282
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new HashMap<>();
258283
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
284+
259285
while (shardsIt.hasNext()) {
260286
ShardId shardId = shardsIt.next();
261287
ShardFailure failure = shardFailures.get(shardId);
@@ -265,31 +291,45 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
265291
}
266292
TargetShard shard = targetShards.getShard(shardId);
267293
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
268-
DiscoveryNode selectedNode = null;
269294
while (nodesIt.hasNext()) {
270295
DiscoveryNode node = nodesIt.next();
271-
if (nodeToShardIds.containsKey(node) || nodePermits.get(node).tryAcquire()) {
296+
List<ShardId> pendingRequest = nodeToShardIds.get(node);
297+
if (pendingRequest != null) {
298+
pendingRequest.add(shard.shardId);
272299
nodesIt.remove();
273300
shardsIt.remove();
274-
selectedNode = node;
275301
break;
276302
}
277-
}
278-
if (selectedNode != null) {
279-
nodeToShardIds.computeIfAbsent(selectedNode, unused -> new ArrayList<>()).add(shard.shardId);
303+
304+
if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
305+
if (nodePermits.get(node).tryAcquire()) {
306+
pendingRequest = new ArrayList<>();
307+
pendingRequest.add(shard.shardId);
308+
nodeToShardIds.put(node, pendingRequest);
309+
310+
nodesIt.remove();
311+
shardsIt.remove();
312+
313+
break;
314+
} else if (concurrentRequests != null) {
315+
concurrentRequests.release();
316+
}
317+
}
280318
}
281319
}
320+
282321
final List<NodeRequest> nodeRequests = new ArrayList<>(nodeToShardIds.size());
283-
for (var e : nodeToShardIds.entrySet()) {
284-
List<ShardId> shardIds = e.getValue();
322+
for (var entry : nodeToShardIds.entrySet()) {
323+
var node = entry.getKey();
324+
var shardIds = entry.getValue();
285325
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
286326
for (ShardId shardId : shardIds) {
287327
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
288328
if (aliasFilter != null) {
289329
aliasFilters.put(shardId.getIndex(), aliasFilter);
290330
}
291331
}
292-
nodeRequests.add(new NodeRequest(e.getKey(), shardIds, aliasFilters));
332+
nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters));
293333
}
294334
return nodeRequests;
295335
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public final class QueryPragmas implements Writeable {
5555
*/
5656
public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);
5757

58-
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
58+
public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = //
59+
Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100);
60+
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = //
61+
Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
5962

6063
public static final Setting<Boolean> NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true);
6164

@@ -122,6 +125,13 @@ public int enrichMaxWorkers() {
122125
return ENRICH_MAX_WORKERS.get(settings);
123126
}
124127

128+
/**
129+
* The maximum number of nodes to be queried at once by this query. This is safeguard to avoid overloading the cluster.
130+
*/
131+
public int maxConcurrentNodesPerCluster() {
132+
return MAX_CONCURRENT_NODES_PER_CLUSTER.get(settings);
133+
}
134+
125135
/**
126136
* The maximum number of shards can be executed concurrently on a single node by this query. This is a safeguard to avoid
127137
* opening and holding many shards (equivalent to many file descriptors) or having too many field infos created by a single query.

0 commit comments

Comments
 (0)