Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
63b19d6
Test shard movements during query
idegtiarenko Apr 8, 2025
90f33b8
improve test
idegtiarenko Apr 10, 2025
b1a04ff
cleanup
idegtiarenko Apr 10, 2025
545c50d
fmt
idegtiarenko Apr 10, 2025
8b5f590
make searchShards reusable
idegtiarenko Apr 10, 2025
406df8d
implement retry
idegtiarenko Apr 10, 2025
1e3e92d
minor cleanups
idegtiarenko Apr 11, 2025
f7916f7
Update docs/changelog/126653.yaml
idegtiarenko Apr 11, 2025
068b6be
request only relevant indices
idegtiarenko Apr 11, 2025
a031270
limit retry attempts
idegtiarenko Apr 11, 2025
1453368
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 11, 2025
4220622
make it possible to inject shard resolution logic
idegtiarenko Apr 11, 2025
2ea3292
select random pattern
idegtiarenko Apr 11, 2025
782c595
add unit tests
idegtiarenko Apr 14, 2025
8d2293e
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 14, 2025
54c0656
separate test for retrying only relevant shards
idegtiarenko Apr 14, 2025
811d8fc
remove todo
idegtiarenko Apr 14, 2025
c20d53c
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 14, 2025
56128ee
do can match only once
idegtiarenko Apr 15, 2025
9e2bb85
testSearchWhileRelocating
idegtiarenko Apr 15, 2025
333ed60
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 16, 2025
b8b4a95
move logic to DataNodeComputeHandler
idegtiarenko Apr 16, 2025
503f13e
update remaining nodes under the lock
idegtiarenko Apr 16, 2025
e660001
upd
idegtiarenko Apr 16, 2025
0908649
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 16, 2025
b2819b6
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 16, 2025
e899a10
retry only when not received data
idegtiarenko Apr 17, 2025
e298aaf
make resolution sync
idegtiarenko Apr 22, 2025
bb67124
limit retry attempts
idegtiarenko Apr 22, 2025
ef0ffef
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126653
summary: Retry shard movements during ESQL query
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.hasSize;

public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this test similar to SearchWhileRelocatingIT? We continue running ES|QL on one thread while moving shards back and forth between two sets of nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

public void testRetryOnShardMovement() {
internalCluster().ensureAtLeastNumDataNodes(2);

var index = randomIdentifier();

assertAcked(
client().admin()
.indices()
.prepareCreate(index)
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);
client().prepareBulk(index)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("key", "value"))
.get();

var shouldMove = new AtomicBoolean(true);

for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
(handler, request, channel, task) -> {
// move index shard
if (shouldMove.compareAndSet(true, false)) {
var currentShardNodeId = clusterService().state()
.routingTable()
.index(index)
.shard(0)
.primaryShard()
.currentNodeId();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to check if this could be replaced by a proper API call instead

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(index)
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", currentShardNodeId))
);
ensureGreen(index);
}
// execute data node request
handler.messageReceived(request, channel, task);
}
);
}

try (EsqlQueryResponse resp = run("FROM " + index + "*")) {
assertThat(getValuesList(resp), hasSize(1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ void startComputeOnDataNodes(
esqlExecutor,
clusterAlias,
parentTask,
originalIndices,
PlannerUtils.canMatchFilter(dataNodePlan),
configuration.allowPartialResults(),
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
) {
Expand Down Expand Up @@ -199,10 +201,7 @@ protected void sendRequest(
);
}
}.startComputeOnDataNodes(
clusterAlias,
concreteIndices,
originalIndices,
PlannerUtils.canMatchFilter(dataNodePlan),
runOnTaskFailure,
ActionListener.releaseAfter(outListener, exchangeSource.addEmptySink())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
Expand All @@ -41,6 +39,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand All @@ -53,6 +52,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

import static org.elasticsearch.core.TimeValue.timeValueNanos;

/**
* Handles computes within a single cluster by dispatching {@link DataNodeRequest} to data nodes
Expand All @@ -77,6 +79,7 @@ abstract class DataNodeRequestSender {
private final Executor esqlExecutor;
private final String clusterAlias;
private final CancellableTask rootTask;
private final SearchShardsRequest searchShardsRequest;
private final boolean allowPartialResults;
private final Semaphore concurrentRequests;
private final ReentrantLock sendingLock = new ReentrantLock();
Expand All @@ -85,50 +88,50 @@ abstract class DataNodeRequestSender {
private final Map<ShardId, ShardFailure> shardFailures = ConcurrentCollections.newConcurrentMap();
private final AtomicInteger skippedShards = new AtomicInteger();
private final AtomicBoolean changed = new AtomicBoolean();
private final AtomicBoolean reResolvingUnavailableShards = new AtomicBoolean(false);
private boolean reportedFailure = false; // guarded by sendingLock

DataNodeRequestSender(
TransportService transportService,
Executor esqlExecutor,
String clusterAlias,
CancellableTask rootTask,
OriginalIndices originalIndices,
QueryBuilder requestFilter,
boolean allowPartialResults,
int concurrentRequests
) {
this.transportService = transportService;
this.esqlExecutor = esqlExecutor;
this.clusterAlias = clusterAlias;
this.rootTask = rootTask;
this.searchShardsRequest = new SearchShardsRequest(
originalIndices.indices(),
originalIndices.indicesOptions(),
requestFilter,
null,
null,
true, // unavailable_shards will be handled by the sender
clusterAlias
);
this.allowPartialResults = allowPartialResults;
this.concurrentRequests = concurrentRequests > 0 ? new Semaphore(concurrentRequests) : null;
}

final void startComputeOnDataNodes(
String clusterAlias,
Set<String> concreteIndices,
OriginalIndices originalIndices,
QueryBuilder requestFilter,
Runnable runOnTaskFailure,
ActionListener<ComputeResponse> listener
) {
final void startComputeOnDataNodes(Set<String> concreteIndices, Runnable runOnTaskFailure, ActionListener<ComputeResponse> listener) {
final long startTimeInNanos = System.nanoTime();
searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> {
searchShards(shardId -> concreteIndices.contains(shardId.getIndexName()), ActionListener.wrap(targetShards -> {
try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
return new ComputeResponse(
profiles,
TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos),
timeValueNanos(System.nanoTime() - startTimeInNanos),
targetShards.totalShards(),
targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
targetShards.skippedShards() + skippedShards.get(),
shardFailures.size(),
selectFailures()
);
}))) {
for (TargetShard shard : targetShards.shards.values()) {
for (DiscoveryNode node : shard.remainingNodes) {
nodePermits.putIfAbsent(node, new Semaphore(1));
}
}
pendingShardIds.addAll(order(targetShards));
trySendingRequestsForPendingShards(targetShards, computeListener);
}
Expand Down Expand Up @@ -171,15 +174,17 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu
if (changed.compareAndSet(true, false) == false) {
break;
}
for (ShardId shardId : pendingShardIds) {
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
shardFailures.compute(
shardId,
(k, v) -> new ShardFailure(
true,
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
)
);
if (reResolvingUnavailableShards.get() == false) {
for (ShardId shardId : pendingShardIds) {
if (targetShards.getShard(shardId).remainingNodes.isEmpty()) {
shardFailures.compute(
shardId,
(k, v) -> new ShardFailure(
true,
v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure
)
);
}
}
}
if (reportedFailure
Expand Down Expand Up @@ -238,11 +243,29 @@ private List<ShardSearchFailure> selectFailures() {
private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {
final ActionListener<List<DriverProfile>> listener = computeListener.acquireCompute();
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {

private final Set<ShardId> pendingRetries = new HashSet<>();

void onAfter(List<DriverProfile> profiles) {
nodePermits.get(request.node).release();
if (concurrentRequests != null) {
concurrentRequests.release();
}

// TODO limit attempts
if (pendingRetries.isEmpty() == false) {
reResolvingUnavailableShards.set(true);
// TODO narrow down search resolution to pending shards only
searchShards(pendingRetries::contains, computeListener.acquireAvoid().delegateFailure((l, newSearchShards) -> {
for (var entry : newSearchShards.shards.entrySet()) {
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue().remainingNodes);
}
reResolvingUnavailableShards.set(false);
trySendingRequestsForPendingShards(targetShards, computeListener);
l.onResponse(null);
}));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other branch should be in else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary. We could make progress without waiting for the moved shard(s) resolution in case there are other shards in queue.


trySendingRequestsForPendingShards(targetShards, computeListener);
listener.onResponse(profiles);
}
Expand All @@ -255,10 +278,11 @@ public void onResponse(DataNodeComputeResponse response) {
shardFailures.remove(shardId);
}
}
for (Map.Entry<ShardId, Exception> e : response.shardLevelFailures().entrySet()) {
final ShardId shardId = e.getKey();
trackShardLevelFailure(shardId, false, e.getValue());
for (var entry : response.shardLevelFailures().entrySet()) {
final ShardId shardId = entry.getKey();
trackShardLevelFailure(shardId, false, entry.getValue());
pendingShardIds.add(shardId);
maybeScheduleRetry(shardId, entry.getValue());
}
onAfter(response.profiles());
}
Expand All @@ -267,7 +291,8 @@ public void onResponse(DataNodeComputeResponse response) {
public void onFailure(Exception e, boolean receivedData) {
for (ShardId shardId : request.shardIds) {
trackShardLevelFailure(shardId, receivedData, e);
pendingShardIds.add(shardId);
pendingShardIds.add(shardId);// TODO should this shard be added only in case of non-fatal failure?
maybeScheduleRetry(shardId, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can retry only if receivedData is false.

}
onAfter(List.of());
}
Expand All @@ -281,6 +306,13 @@ public void onSkip() {
onResponse(new DataNodeComputeResponse(List.of(), Map.of()));
}
}

private void maybeScheduleRetry(ShardId shardId, Exception e) {
if (targetShards.getShard(shardId).remainingNodes.isEmpty()
&& unwrapFailure(e) instanceof NoShardAvailableActionException) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender#unwrapFailure that uses

public static boolean isShardNotAvailableException(final Throwable e) {
final Throwable actual = ExceptionsHelper.unwrapCause(e);
return (actual instanceof ShardNotFoundException
|| actual instanceof IndexNotFoundException
|| actual instanceof IllegalIndexShardStateException
|| actual instanceof NoShardAvailableActionException
|| actual instanceof UnavailableShardsException
|| actual instanceof AlreadyClosedException);
}

Probably this should only retry ShardNotFoundException/NoShardAvailableActionException/UnavailableShardsException. Not sure. Please let me know what do you think.

pendingRetries.add(shardId);
}
}
});
}

Expand Down Expand Up @@ -321,7 +353,7 @@ private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception or
}

/**
* Result from {@link #searchShards(Task, String, QueryBuilder, Set, OriginalIndices, ActionListener)} where can_match is performed to
* Result from {@link #searchShards(Predicate, ActionListener)} where can_match is performed to
* determine what shards can be skipped and which target nodes are needed for running the ES|QL query
*
* @param shards List of target shards to perform the ES|QL query on
Expand Down Expand Up @@ -373,7 +405,7 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
}

if (concurrentRequests == null || concurrentRequests.tryAcquire()) {
if (nodePermits.get(node).tryAcquire()) {
if (nodePermits.computeIfAbsent(node, n -> new Semaphore(1)).tryAcquire()) {
pendingRequest = new ArrayList<>();
pendingRequest.add(shard.shardId);
nodeToShardIds.put(node, pendingRequest);
Expand Down Expand Up @@ -411,14 +443,7 @@ private List<NodeRequest> selectNodeRequests(TargetShards targetShards) {
* Ideally, the search_shards API should be called before the field-caps API; however, this can lead
* to a situation where the column structure (i.e., matched data types) differs depending on the query.
*/
void searchShards(
Task parentTask,
String clusterAlias,
QueryBuilder filter,
Set<String> concreteIndices,
OriginalIndices originalIndices,
ActionListener<TargetShards> listener
) {
void searchShards(Predicate<ShardId> predicate, ActionListener<TargetShards> listener) {
ActionListener<SearchShardsResponse> searchShardsListener = listener.map(resp -> {
Map<String, DiscoveryNode> nodes = new HashMap<>();
for (DiscoveryNode node : resp.getNodes()) {
Expand All @@ -429,15 +454,15 @@ void searchShards(
Map<ShardId, TargetShard> shards = new HashMap<>();
for (SearchShardsGroup group : resp.getGroups()) {
var shardId = group.shardId();
if (concreteIndices.contains(shardId.getIndexName()) == false) {
if (predicate.test(shardId) == false) {
continue;
}
totalShards++;
if (group.skipped()) {
skippedShards++;
continue;
}
List<DiscoveryNode> allocatedNodes = new ArrayList<>(group.allocatedNodes().size());
List<DiscoveryNode> allocatedNodes = Collections.synchronizedList(new ArrayList<>(group.allocatedNodes().size()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make the logic that resolves the new target nodes a helper method and call it under sendingLock to avoid handling concurrency.

for (String n : group.allocatedNodes()) {
allocatedNodes.add(nodes.get(n));
}
Expand All @@ -446,20 +471,11 @@ void searchShards(
}
return new TargetShards(shards, totalShards, skippedShards);
});
SearchShardsRequest searchShardsRequest = new SearchShardsRequest(
originalIndices.indices(),
originalIndices.indicesOptions(),
filter,
null,
null,
true, // unavailable_shards will be handled by the sender
clusterAlias
);
transportService.sendChildRequest(
transportService.getLocalNode(),
EsqlSearchShardsAction.TYPE.name(),
searchShardsRequest,
parentTask,
rootTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(searchShardsListener, SearchShardsResponse::new, esqlExecutor)
);
Expand Down
Loading
Loading