Skip to content
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test026InstallBundledRepositoryPluginsViaConfigFile
issue: https://github.com/elastic/elasticsearch/issues/127158
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderTests
method: testRetryOnlyMovedShards
issue: https://github.com/elastic/elasticsearch/issues/127168
- class: org.elasticsearch.xpack.esql.plugin.DataNodeRequestSenderIT
method: testSearchWhileRelocating
issue: https://github.com/elastic/elasticsearch/issues/127188
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,16 @@ private void sendOneNodeRequest(TargetShards targetShards, ComputeListener compu
final ActionListener<DriverCompletionInfo> listener = computeListener.acquireCompute();
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {

private final Set<ShardId> pendingRetries = new HashSet<>();
private Set<ShardId> pendingRetries;

void onAfter(DriverCompletionInfo info) {
nodePermits.get(request.node).release();
if (concurrentRequests != null) {
concurrentRequests.release();
}

if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
if (pendingRetries != null) {
try {
sendingLock.lock();
var resolutions = resolveShards(pendingRetries);
for (var entry : resolutions.entrySet()) {
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
Expand All @@ -291,19 +290,19 @@ public void onResponse(DataNodeComputeResponse response) {
}
for (var entry : response.shardLevelFailures().entrySet()) {
final ShardId shardId = entry.getKey();
maybeScheduleRetry(shardId, false, entry.getValue());
trackShardLevelFailure(shardId, false, entry.getValue());
pendingShardIds.add(shardId);
maybeScheduleRetry(shardId, false, entry.getValue());
}
onAfter(response.completionInfo());
}

@Override
public void onFailure(Exception e, boolean receivedData) {
for (ShardId shardId : request.shardIds) {
maybeScheduleRetry(shardId, receivedData, e);
trackShardLevelFailure(shardId, receivedData, e);
pendingShardIds.add(shardId);
maybeScheduleRetry(shardId, receivedData, e);
}
onAfter(DriverCompletionInfo.EMPTY);
}
Expand All @@ -322,7 +321,13 @@ private void maybeScheduleRetry(ShardId shardId, boolean receivedData, Exception
if (receivedData == false
&& targetShards.getShard(shardId).remainingNodes.isEmpty()
&& unwrapFailure(shardId, e) instanceof NoShardAvailableActionException) {
pendingRetries.add(shardId);
if (pendingRetries == null && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
pendingRetries = new HashSet<>();
sendingLock.lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned that we are scattering sendingLock#lock and sendingLock#unlock in two different places. Can we keep them close?

Copy link
Contributor Author

@idegtiarenko idegtiarenko Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are in the same inner listener, guarding the same structure at the moment.
They were previously in the same method before but that was not enough and caused a bug.
I suspect we could do this by creating a releasable inner wrapper class on top of pendingRetries = new HashSet<>() but that sounds like an overkill not really helping with readability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@idegtiarenko Sorry, I should have provided more detail. My concern is that we acquire the sending lock in maybeScheduleRetry and release it in onAfter, which is linked to the status of pendingRetries. While the implementation is technically correct, I think we should stick to the simplest lock pattern unless there is a strong reason to do otherwise:

lock/tryLock
try {
  ...
} finally {
  unlock
}

I think we can follow this lock pattern in the DataNodeRequestSender class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds a lot like the original implementation (within onAfter) that was not correct.
Do you see a way how to implement this suggestion while still locking conditionally (only if shard movement is detected) and blocking concurrent requests while it is detected that new shard location resolution is required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed yesterday, I moved retry scheduling to trySendingRequestsForPendingShards in 8a0dcc6

}
if (pendingRetries != null) {
pendingRetries.add(shardId);
}
}
}
});
Expand Down