Skip to content
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test026InstallBundledRepositoryPluginsViaConfigFile
issue: https://github.com/elastic/elasticsearch/issues/127158
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests
method: testRetryOnlyMovedShards
issue: https://github.com/elastic/elasticsearch/issues/127168
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT
method: testSearchWhileRelocating
issue: https://github.com/elastic/elasticsearch/issues/127188
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,20 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
if (changed.compareAndSet(true, false) == false) {
break;
}
var pendingRetries = new HashSet<ShardId>();
for (ShardId shardId : pendingShardIds) {
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
var failure = shardFailures.get(shardId);
if (failure != null && failure.fatal == false && failure.failure instanceof NoShardAvailableActionException) {
pendingRetries.add(shardId);
}
}
}
if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
for (var entry : resolveShards(pendingRetries).entrySet()) {
targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue());
}
}
for (ShardId shardId : pendingShardIds) {
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
shardFailures.compute(
Expand Down Expand Up @@ -257,26 +271,11 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {

private final Set<ShardId> pendingRetries = new HashSet<>();

void onAfter(DriverCompletionInfo info) {
nodePermits.get(request.node).release();
if (concurrentRequests != null) {
concurrentRequests.release();
}

if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
try {
sendingLock.lock();
var resolutions = resolveShards(pendingRetries);
for (var entry : resolutions.entrySet()) {
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
}
} finally {
sendingLock.unlock();
}
}

trySendingRequestsForPendingShards(targetShards, computeListener);
listener.onResponse(info);
}
Expand All @@ -293,7 +292,6 @@ public void onResponse(DataNodeComputeResponse response) {
final ShardId shardId = entry.getKey();
trackShardLevelFailure(shardId, false, entry.getValue());
pendingShardIds.add(shardId);
maybeScheduleRetry(shardId, false, entry.getValue());
}
onAfter(response.completionInfo());
}
Expand All @@ -303,7 +301,6 @@ public void onFailure(Exception e, boolean receivedData) {
for (ShardId shardId : request.shardIds) {
trackShardLevelFailure(shardId, receivedData, e);
pendingShardIds.add(shardId);
maybeScheduleRetry(shardId, receivedData, e);
}
onAfter(DriverCompletionInfo.EMPTY);
}
Expand All @@ -317,14 +314,6 @@ public void onSkip() {
onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
}
}

private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception e) {
if (receivedData == false
&& targetShards.getShard(shardId).remainingNodes.isEmpty()
&& unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) {
pendingRetries.add(shardId);
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,46 @@ public void testRetryOnlyMovedShards() {
assertThat("Must retry only affected shards", resolvedShards, contains(shard2));
}

public void testRetryUnassignedShardWithoutPartialResults() {
var attempt = new AtomicInteger(0);
var future = sendRequests(false, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> {
attempt.incrementAndGet();
return Map.of(shard1, List.of());
},
(node, shardIds, aliasFilters, listener) -> runWithDelay(
() -> listener.onResponse(
Objects.equals(shardIds, List.of(shard2))
? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
: new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
)
)

);
expectThrows(NoShardAvailableActionException.class, containsString("no such shard"), future::actionGet);
}

public void testRetryUnassignedShardWithPartialResults() {
var response = safeGet(
sendRequests(
true,
-1,
List.of(targetShard(shard1, node1), targetShard(shard2, node2)),
shardIds -> Map.of(shard1, List.of()),
(node, shardIds, aliasFilters, listener) -> runWithDelay(
() -> listener.onResponse(
Objects.equals(shardIds, List.of(shard2))
? new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of())
: new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
)
)
)
);
assertThat(response.totalShards, equalTo(2));
assertThat(response.successfulShards, equalTo(1));
assertThat(response.skippedShards, equalTo(0));
assertThat(response.failedShards, equalTo(1));
}

static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
}
Expand Down