Skip to content

Commit 37d1053

Browse files
idegtiarenkoafoucret
authored andcommitted
Fix request processing scheduling (elastic#127464)
1 parent 8d5809e commit 37d1053

File tree

3 files changed

+54
-28
lines changed

3 files changed

+54
-28
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,9 +396,6 @@ tests:
396396
- class: org.elasticsearch.packaging.test.DockerTests
397397
method: test026InstallBundledRepositoryPluginsViaConfigFile
398398
issue: https://github.com/elastic/elasticsearch/issues/127158
399-
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests
400-
method: testRetryOnlyMovedShards
401-
issue: https://github.com/elastic/elasticsearch/issues/127168
402399
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT
403400
method: testSearchWhileRelocating
404401
issue: https://github.com/elastic/elasticsearch/issues/127188

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,20 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
189189
if (changed.compareAndSet(true, false) == false) {
190190
break;
191191
}
192+
var pendingRetries = new HashSet<ShardId>();
193+
for (ShardId shardId : pendingShardIds) {
194+
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
195+
var failure = shardFailures.get(shardId);
196+
if (failure != null && failure.fatal == false && failure.failure instanceof NoShardAvailableActionException) {
197+
pendingRetries.add(shardId);
198+
}
199+
}
200+
}
201+
if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
202+
for (var entry : resolveShards(pendingRetries).entrySet()) {
203+
targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue());
204+
}
205+
}
192206
for (ShardId shardId : pendingShardIds) {
193207
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
194208
shardFailures.compute(
@@ -257,26 +271,11 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
257271
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
258272
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {
259273

260-
private final Set<ShardId> pendingRetries = new HashSet<>();
261-
262274
void onAfter(DriverCompletionInfo info) {
263275
nodePermits.get(request.node).release();
264276
if (concurrentRequests != null) {
265277
concurrentRequests.release();
266278
}
267-
268-
if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
269-
try {
270-
sendingLock.lock();
271-
var resolutions = resolveShards(pendingRetries);
272-
for (var entry : resolutions.entrySet()) {
273-
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
274-
}
275-
} finally {
276-
sendingLock.unlock();
277-
}
278-
}
279-
280279
trySendingRequestsForPendingShards(targetShards, computeListener);
281280
listener.onResponse(info);
282281
}
@@ -293,7 +292,6 @@ public void onResponse(DataNodeComputeResponse response) {
293292
final ShardId shardId = entry.getKey();
294293
trackShardLevelFailure(shardId, false, entry.getValue());
295294
pendingShardIds.add(shardId);
296-
maybeScheduleRetry(shardId, false, entry.getValue());
297295
}
298296
onAfter(response.completionInfo());
299297
}
@@ -303,7 +301,6 @@ public void onFailure(Exception e, boolean receivedData) {
303301
for (ShardId shardId : request.shardIds) {
304302
trackShardLevelFailure(shardId, receivedData, e);
305303
pendingShardIds.add(shardId);
306-
maybeScheduleRetry(shardId, receivedData, e);
307304
}
308305
onAfter(DriverCompletionInfo.EMPTY);
309306
}
@@ -317,14 +314,6 @@ public void onSkip() {
317314
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
318315
}
319316
}
320-
321-
private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception e) {
322-
if (receivedData == false
323-
&& targetShards.getShard(shardId).remainingNodes.isEmpty()
324-
&& unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) {
325-
pendingRetries.add(shardId);
326-
}
327-
}
328317
});
329318
}
330319

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,46 @@ public void testRetryOnlyMovedShards() {
501501
assertThat("Must retry only affected shards", resolvedShards, contains(shard2));
502502
}
503503

504+
public void testRetryUnassignedShardWithoutPartialResults() {
505+
var attempt = new AtomicInteger(0);
506+
var future = sendRequests(false, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> {
507+
attempt.incrementAndGet();
508+
return Map.of(shard1, List.of());
509+
},
510+
(node, shardIds, aliasFilters, listener) -> runWithDelay(
511+
() -> listener.onResponse(
512+
Objects.equals(shardIds, List.of(shard2))
513+
? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
514+
: new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
515+
)
516+
)
517+
518+
);
519+
expectThrows(NoShardAvailableActionException.class, containsString("no such shard"), future::actionGet);
520+
}
521+
522+
public void testRetryUnassignedShardWithPartialResults() {
523+
var response = safeGet(
524+
sendRequests(
525+
true,
526+
-1,
527+
List.of(targetShard(shard1, node1), targetShard(shard2, node2)),
528+
shardIds -> Map.of(shard1, List.of()),
529+
(node, shardIds, aliasFilters, listener) -> runWithDelay(
530+
() -> listener.onResponse(
531+
Objects.equals(shardIds, List.of(shard2))
532+
? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
533+
: new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
534+
)
535+
)
536+
)
537+
);
538+
assertThat(response.totalShards, equalTo(2));
539+
assertThat(response.successfulShards, equalTo(1));
540+
assertThat(response.skippedShards, equalTo(0));
541+
assertThat(response.failedShards, equalTo(1));
542+
}
543+
504544
static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
505545
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
506546
}

0 commit comments

Comments
 (0)