Skip to content

Commit d4850fc

Browse files
committed
upd
1 parent eec2039 commit d4850fc

File tree

3 files changed

+15
-14
lines changed

3 files changed

+15
-14
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ abstract class DataNodeRequestSender {
7575
this.esqlExecutor = esqlExecutor;
7676
this.rootTask = rootTask;
7777
this.allowPartialResults = allowPartialResults;
78-
this.concurrentRequests = new Semaphore(concurrentRequests);
78+
this.concurrentRequests = concurrentRequests > 1 ? new Semaphore(concurrentRequests) : null;
7979
}
8080

8181
final void startComputeOnDataNodes(
@@ -168,7 +168,9 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
168168
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
169169
void onAfter(List<DriverProfile> profiles) {
170170
nodePermits.get(request.node).release();
171-
concurrentRequests.release();
171+
if (concurrentRequests != null) {
172+
concurrentRequests.release();
173+
}
172174
trySendingRequestsForPendingShards(targetShards, computeListener);
173175
listener.onResponse(profiles);
174176
}
@@ -290,12 +292,11 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
290292
}
291293
}
292294

293-
var size = Math.min(concurrentRequests.availablePermits(), nodeToShardIds.size());
294-
final List<NodeRequest> nodeRequests = new ArrayList<>(size);
295+
final List<NodeRequest> nodeRequests = new ArrayList<>(nodeToShardIds.size());
295296
for (var entry : nodeToShardIds.entrySet()) {
296297
var node = entry.getKey();
297298
var shardIds = entry.getValue();
298-
if (concurrentRequests.tryAcquire()) {
299+
if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
299300
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
300301
for (ShardId shardId : shardIds) {
301302
var aliasFilter = targetShards.getShard(shardId).aliasFilter;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public final class QueryPragmas implements Writeable {
5656
public static final Setting<TimeValue> STATUS_INTERVAL = Setting.timeSetting("status_interval", Driver.DEFAULT_STATUS_INTERVAL);
5757

5858
public static final Setting<Integer> MAX_CONCURRENT_NODES_PER_CLUSTER = //
59-
Setting.intSetting("max_concurrent_nodes_per_cluster", 10, 1, 100);
59+
Setting.intSetting("max_concurrent_nodes_per_cluster", -1, -1, 100);
6060
public static final Setting<Integer> MAX_CONCURRENT_SHARDS_PER_NODE = //
6161
Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100);
6262

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testEmpty() {
9494
var future = sendRequests(
9595
List.of(),
9696
randomBoolean(),
97-
10,
97+
-1,
9898
(node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent")
9999
);
100100
var resp = safeGet(future);
@@ -109,7 +109,7 @@ public void testOnePass() {
109109
targetShard(shard4, node2, node3)
110110
);
111111
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
112-
var future = sendRequests(targetShards, randomBoolean(), 10, (node, shardIds, aliasFilters, listener) -> {
112+
var future = sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> {
113113
sent.add(new NodeRequest(node, shardIds, aliasFilters));
114114
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
115115
});
@@ -121,15 +121,15 @@ public void testOnePass() {
121121
public void testMissingShards() {
122122
{
123123
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
124-
var future = sendRequests(targetShards, false, 10, (node, shardIds, aliasFilters, listener) -> {
124+
var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> {
125125
fail("expect no data-node request is sent when target shards are missing");
126126
});
127127
var error = expectThrows(NoShardAvailableActionException.class, future::actionGet);
128128
assertThat(error.getMessage(), containsString("no shard copies found"));
129129
}
130130
{
131131
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
132-
var future = sendRequests(targetShards, true, 10, (node, shardIds, aliasFilters, listener) -> {
132+
var future = sendRequests(targetShards, true, -1, (node, shardIds, aliasFilters, listener) -> {
133133
assertThat(shard3, not(in(shardIds)));
134134
runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
135135
});
@@ -149,7 +149,7 @@ public void testRetryThenSuccess() {
149149
targetShard(shard5, node1, node3, node2)
150150
);
151151
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
152-
var future = sendRequests(targetShards, randomBoolean(), 10, (node, shardIds, aliasFilters, listener) -> {
152+
var future = sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> {
153153
sent.add(new NodeRequest(node, shardIds, aliasFilters));
154154
Map<ShardId, Exception> failures = new HashMap<>();
155155
if (node.equals(node1) && shardIds.contains(shard5)) {
@@ -181,7 +181,7 @@ public void testRetryButFail() {
181181
targetShard(shard5, node1, node3, node2)
182182
);
183183
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
184-
var future = sendRequests(targetShards, false, 10, (node, shardIds, aliasFilters, listener) -> {
184+
var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> {
185185
sent.add(new NodeRequest(node, shardIds, aliasFilters));
186186
Map<ShardId, Exception> failures = new HashMap<>();
187187
if (shardIds.contains(shard5)) {
@@ -207,7 +207,7 @@ public void testDoNotRetryOnRequestLevelFailure() {
207207
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1));
208208
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
209209
AtomicBoolean failed = new AtomicBoolean();
210-
var future = sendRequests(targetShards, false, 10, (node, shardIds, aliasFilters, listener) -> {
210+
var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> {
211211
sent.add(new NodeRequest(node, shardIds, aliasFilters));
212212
if (node1.equals(node) && failed.compareAndSet(false, true)) {
213213
runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true));
@@ -227,7 +227,7 @@ public void testAllowPartialResults() {
227227
var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1, node2));
228228
Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
229229
AtomicBoolean failed = new AtomicBoolean();
230-
var future = sendRequests(targetShards, true, 10, (node, shardIds, aliasFilters, listener) -> {
230+
var future = sendRequests(targetShards, true, -1, (node, shardIds, aliasFilters, listener) -> {
231231
sent.add(new NodeRequest(node, shardIds, aliasFilters));
232232
if (node1.equals(node) && failed.compareAndSet(false, true)) {
233233
runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true));

0 commit comments

Comments
 (0)