Skip to content

Commit ae96763

Browse files
committed
upd
1 parent d4850fc commit ae96763

File tree

1 file changed

+25
-20
lines changed

1 file changed

+25
-20
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
268268
assert sendingLock.isHeldByCurrentThread();
269269
final Map<DiscoveryNode, List<ShardId>> nodeToShardIds = new HashMap<>();
270270
final Iterator<ShardId> shardsIt = pendingShardIds.iterator();
271+
271272
while (shardsIt.hasNext()) {
272273
ShardId shardId = shardsIt.next();
273274
ShardFailure failure = shardFailures.get(shardId);
@@ -277,41 +278,45 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
277278
}
278279
TargetShard shard = targetShards.getShard(shardId);
279280
Iterator<DiscoveryNode> nodesIt = shard.remainingNodes.iterator();
280-
DiscoveryNode selectedNode = null;
281281
while (nodesIt.hasNext()) {
282282
DiscoveryNode node = nodesIt.next();
283-
if (nodeToShardIds.containsKey(node) || nodePermits.get(node).tryAcquire()) {
283+
List<ShardId> pendingRequest = nodeToShardIds.get(node);
284+
if (pendingRequest != null) {
285+
pendingRequest.add(shard.shardId);
284286
nodesIt.remove();
285287
shardsIt.remove();
286-
selectedNode = node;
287288
break;
288289
}
289-
}
290-
if (selectedNode != null) {
291-
nodeToShardIds.computeIfAbsent(selectedNode, unused -> new ArrayList<>()).add(shard.shardId);
290+
291+
if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
292+
if (nodePermits.get(node).tryAcquire()) {
293+
pendingRequest = new ArrayList<>();
294+
pendingRequest.add(shard.shardId);
295+
nodeToShardIds.put(node, pendingRequest);
296+
297+
nodesIt.remove();
298+
shardsIt.remove();
299+
300+
break;
301+
} else if (concurrentRequests != null) {
302+
concurrentRequests.release();
303+
}
304+
}
292305
}
293306
}
294307

295308
final List<NodeRequest> nodeRequests = new ArrayList<>(nodeToShardIds.size());
296309
for (var entry : nodeToShardIds.entrySet()) {
297310
var node = entry.getKey();
298311
var shardIds = entry.getValue();
299-
if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
300-
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
301-
for (ShardId shardId : shardIds) {
302-
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
303-
if (aliasFilter != null) {
304-
aliasFilters.put(shardId.getIndex(), aliasFilter);
305-
}
306-
}
307-
nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters));
308-
} else {
309-
pendingShardIds.addAll(shardIds);
310-
for (ShardId shardId : shardIds) {
311-
targetShards.getShard(shardId).remainingNodes.add(node);
312+
Map<Index, AliasFilter> aliasFilters = new HashMap<>();
313+
for (ShardId shardId : shardIds) {
314+
var aliasFilter = targetShards.getShard(shardId).aliasFilter;
315+
if (aliasFilter != null) {
316+
aliasFilters.put(shardId.getIndex(), aliasFilter);
312317
}
313-
nodePermits.get(node).release();
314318
}
319+
nodeRequests.add(new NodeRequest(node, shardIds, aliasFilters));
315320
}
316321
return nodeRequests;
317322
}

0 commit comments

Comments
 (0)