Skip to content

Commit e298aaf

Browse files
committed
make resolution sync
1 parent e899a10 commit e298aaf

File tree

2 files changed

+55
-80
lines changed

2 files changed

+55
-80
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,6 @@ abstract class DataNodeRequestSender {
9898
private final AtomicBoolean changed = new AtomicBoolean();
9999
private boolean reportedFailure = false; // guarded by sendingLock
100100

101-
private final AtomicInteger ongoingTargetShardResolutionAttempts = new AtomicInteger(0);
102-
private final AtomicInteger remainingTargetShardSearchAttempts = new AtomicInteger(10);
103-
104101
DataNodeRequestSender(
105102
ClusterService clusterService,
106103
ProjectResolver projectResolver,
@@ -187,17 +184,15 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
187184
if (changed.compareAndSet(true, false) == false) {
188185
break;
189186
}
190-
if (ongoingTargetShardResolutionAttempts.get() == 0) {
191-
for (ShardId shardId : pendingShardIds) {
192-
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
193-
shardFailures.compute(
194-
shardId,
195-
(k, v) -> new ShardFailure(
196-
true,
197-
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
198-
)
199-
);
200-
}
187+
for (ShardId shardId : pendingShardIds) {
188+
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
189+
shardFailures.compute(
190+
shardId,
191+
(k, v) -> new ShardFailure(
192+
true,
193+
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
194+
)
195+
);
201196
}
202197
}
203198
if (reportedFailure
@@ -265,14 +260,16 @@ void onAfter(DriverCompletionInfo info) {
265260
concurrentRequests.release();
266261
}
267262

268-
if (pendingRetries.isEmpty() == false && remainingTargetShardSearchAttempts.getAndDecrement() > 0) {
269-
ongoingTargetShardResolutionAttempts.incrementAndGet();
270-
resolveShards(pendingRetries, computeListener.acquireAvoid().delegateFailure((l, resolutions) -> {
271-
addRemainingNodes(targetShards, resolutions);
272-
ongoingTargetShardResolutionAttempts.decrementAndGet();
273-
trySendingRequestsForPendingShards(targetShards, computeListener);
274-
l.onResponse(null);
275-
}));
263+
if (pendingRetries.isEmpty() == false) {
264+
try {
265+
sendingLock.lock();
266+
var resolutions = resolveShards(pendingRetries);
267+
for (var entry : resolutions.entrySet()) {
268+
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
269+
}
270+
} finally {
271+
sendingLock.unlock();
272+
}
276273
}
277274

278275
trySendingRequestsForPendingShards(targetShards, computeListener);
@@ -362,17 +359,6 @@ private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception or
362359
});
363360
}
364361

365-
private void addRemainingNodes(TargetShards targetShards, Map<ShardId, List<DiscoveryNode>> resolutions) {
366-
sendingLock.lock();
367-
try {
368-
for (var entry : resolutions.entrySet()) {
369-
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
370-
}
371-
} finally {
372-
sendingLock.unlock();
373-
}
374-
}
375-
376362
/**
377363
* Result from {@link #searchShards(Set, ActionListener)} where can_match is performed to
378364
* determine what shards can be skipped and which target nodes are needed for running the ES|QL query
@@ -511,11 +497,10 @@ void searchShards(Set<String> concreteIndices, ActionListener<TargetShards> list
511497
);
512498
}
513499

514-
void resolveShards(Set<ShardId> shardIds, ActionListener<Map<ShardId, List<DiscoveryNode>>> listener) {
515-
ActionListener.completeWith(listener, () -> doResolveShards(shardIds));
516-
}
517-
518-
private Map<ShardId, List<DiscoveryNode>> doResolveShards(Set<ShardId> shardIds) {
500+
/**
501+
* Attempts to resolve shards locations after they have been moved
502+
*/
503+
Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
519504
var project = projectResolver.getProjectState(clusterService.state());
520505
var nodes = Maps.<ShardId, List<DiscoveryNode>>newMapWithExpectedSize(shardIds.size());
521506
for (var shardId : shardIds) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -429,15 +429,11 @@ public void testQueryHotShardsFirstWhenIlmMovesShard() {
429429
public void testRetryMovedShard() {
430430
var attempt = new AtomicInteger(0);
431431
var response = safeGet(
432-
sendRequests(
433-
randomBoolean(),
434-
-1,
435-
List.of(targetShard(shard1, node1)),
436-
(shardIds, listener) -> runWithDelay(() -> listener.onResponse(switch (attempt.incrementAndGet()) {
437-
case 1 -> Map.of(shard1, List.of(node2));
438-
case 2 -> Map.of(shard1, List.of(node3));
439-
default -> Map.of(shard1, List.of(node4));
440-
})),
432+
sendRequests(randomBoolean(), -1, List.of(targetShard(shard1, node1)), shardIds -> switch (attempt.incrementAndGet()) {
433+
case 1 -> Map.of(shard1, List.of(node2));
434+
case 2 -> Map.of(shard1, List.of(node3));
435+
default -> Map.of(shard1, List.of(node4));
436+
},
441437
(node, shardIds, aliasFilters, listener) -> runWithDelay(
442438
() -> listener.onResponse(
443439
Objects.equals(node, node4)
@@ -456,10 +452,10 @@ public void testRetryMovedShard() {
456452

457453
public void testDoesNotRetryMovedShardIndefinitely() {
458454
var attempt = new AtomicInteger(0);
459-
var response = safeGet(sendRequests(true, -1, List.of(targetShard(shard1, node1)), (shardIds, listener) -> runWithDelay(() -> {
460-
logger.info("Attempt {}", attempt.incrementAndGet());
461-
listener.onResponse(Map.of(shard1, List.of(node2)));
462-
}),
455+
var response = safeGet(sendRequests(true, -1, List.of(targetShard(shard1, node1)), shardIds -> {
456+
attempt.incrementAndGet();
457+
return Map.of(shard1, List.of(node2));
458+
},
463459
(node, shardIds, aliasFilters, listener) -> runWithDelay(
464460
() -> listener.onResponse(
465461
new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
@@ -477,31 +473,25 @@ public void testRetryOnlyMovedShards() {
477473
var attempt = new AtomicInteger(0);
478474
var resolvedShards = Collections.synchronizedSet(new HashSet<>());
479475
var response = safeGet(
480-
sendRequests(
481-
randomBoolean(),
482-
-1,
483-
List.of(targetShard(shard1, node1, node3), targetShard(shard2, node2)),
484-
(shardIds, listener) -> runWithDelay(() -> {
485-
attempt.incrementAndGet();
486-
resolvedShards.addAll(shardIds);
487-
listener.onResponse(Map.of(shard2, List.of(node4)));
488-
}),
489-
(node, shardIds, aliasFilters, listener) -> runWithDelay(() -> {
490-
if (Objects.equals(node, node1)) {
491-
// search is going to be retried from replica on node3 without shard resolution
492-
listener.onResponse(
493-
new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
494-
);
495-
} else if (Objects.equals(node, node2)) {
496-
// search is going to be retried after resolving new shard node since there are no replicas
497-
listener.onResponse(
498-
new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard2, new ShardNotFoundException(shard2)))
499-
);
500-
} else {
501-
listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
502-
}
503-
})
504-
)
476+
sendRequests(randomBoolean(), -1, List.of(targetShard(shard1, node1, node3), targetShard(shard2, node2)), shardIds -> {
477+
attempt.incrementAndGet();
478+
resolvedShards.addAll(shardIds);
479+
return Map.of(shard2, List.of(node4));
480+
}, (node, shardIds, aliasFilters, listener) -> runWithDelay(() -> {
481+
if (Objects.equals(node, node1)) {
482+
// search is going to be retried from replica on node3 without shard resolution
483+
listener.onResponse(
484+
new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard1, new ShardNotFoundException(shard1)))
485+
);
486+
} else if (Objects.equals(node, node2)) {
487+
// search is going to be retried after resolving new shard node since there are no replicas
488+
listener.onResponse(
489+
new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of(shard2, new ShardNotFoundException(shard2)))
490+
);
491+
} else {
492+
listener.onResponse(new DataNodeComputeResponse(DriverCompletionInfo.EMPTY, Map.of()));
493+
}
494+
}))
505495
);
506496
assertThat(response.totalShards, equalTo(2));
507497
assertThat(response.successfulShards, equalTo(2));
@@ -547,7 +537,7 @@ PlainActionFuture<ComputeResponse> sendRequests(
547537
List<DataNodeRequestSender.TargetShard> shards,
548538
Sender sender
549539
) {
550-
return sendRequests(allowPartialResults, concurrentRequests, shards, (shardIds, listener) -> {
540+
return sendRequests(allowPartialResults, concurrentRequests, shards, shardIds -> {
551541
throw new AssertionError("No shard resolution is expected here");
552542
}, sender);
553543
}
@@ -600,8 +590,8 @@ void searchShards(Set<String> concreteIndices, ActionListener<TargetShards> list
600590
}
601591

602592
@Override
603-
void resolveShards(Set<ShardId> shardIds, ActionListener<Map<ShardId, List<DiscoveryNode>>> listener) {
604-
resolver.resolve(shardIds, listener);
593+
Map<ShardId, List<DiscoveryNode>> resolveShards(Set<ShardId> shardIds) {
594+
return resolver.resolve(shardIds);
605595
}
606596

607597
@Override
@@ -618,7 +608,7 @@ protected void sendRequest(
618608
}
619609

620610
interface Resolver {
621-
void resolve(Set<ShardId> shardIds, ActionListener<Map<ShardId, List<DiscoveryNode>>> listener);
611+
Map<ShardId, List<DiscoveryNode>> resolve(Set<ShardId> shardIds);
622612
}
623613

624614
interface Sender {

0 commit comments

Comments
 (0)