Skip to content

Commit c2c6105

Browse files
authored
[8.19] Retry shard movements during ESQL query (#127807)
1 parent de4e9fb commit c2c6105

File tree

7 files changed

+445
-98
lines changed

7 files changed

+445
-98
lines changed

docs/changelog/126653.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126653
2+
summary: Retry shard movements during ESQL query
3+
area: ES|QL
4+
type: enhancement
5+
issues: []
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.action.support.WriteRequest;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.util.CollectionUtils;
15+
import org.elasticsearch.compute.operator.exchange.ExchangeService;
16+
import org.elasticsearch.plugins.Plugin;
17+
import org.elasticsearch.test.transport.MockTransportService;
18+
import org.elasticsearch.transport.TransportService;
19+
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
20+
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
21+
22+
import java.util.Collection;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.LongAdder;
25+
26+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
27+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
28+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
29+
import static org.hamcrest.Matchers.greaterThan;
30+
import static org.hamcrest.Matchers.hasSize;
31+
32+
public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase {
33+
34+
@Override
35+
protected Collection<Class<? extends Plugin>> nodePlugins() {
36+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
37+
}
38+
39+
public void testSearchWhileRelocating() throws InterruptedException {
40+
internalCluster().ensureAtLeastNumDataNodes(3);
41+
var primaries = randomIntBetween(1, 10);
42+
var replicas = randomIntBetween(0, 1);
43+
44+
indicesAdmin().prepareCreate("index-1").setSettings(indexSettings(primaries, replicas)).get();
45+
46+
var docs = randomIntBetween(10, 100);
47+
var bulk = client().prepareBulk("index-1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
48+
for (int i = 0; i < docs; i++) {
49+
bulk.add(new IndexRequest().source("key", "value-1"));
50+
}
51+
bulk.get();
52+
53+
// start background searches
54+
var stopped = new AtomicBoolean(false);
55+
var queries = new LongAdder();
56+
var threads = new Thread[randomIntBetween(1, 5)];
57+
for (int i = 0; i < threads.length; i++) {
58+
threads[i] = new Thread(() -> {
59+
while (stopped.get() == false) {
60+
try (EsqlQueryResponse resp = run("FROM index-1")) {
61+
assertThat(getValuesList(resp), hasSize(docs));
62+
}
63+
queries.increment();
64+
}
65+
});
66+
}
67+
for (Thread thread : threads) {
68+
thread.start();
69+
}
70+
71+
// start shard movements
72+
var rounds = randomIntBetween(1, 10);
73+
var names = internalCluster().getNodeNames();
74+
for (int i = 0; i < rounds; i++) {
75+
for (String name : names) {
76+
client().admin()
77+
.cluster()
78+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
79+
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", name))
80+
.get();
81+
ensureGreen("index-1");
82+
Thread.yield();
83+
}
84+
}
85+
86+
stopped.set(true);
87+
for (Thread thread : threads) {
88+
thread.join(10_000);
89+
}
90+
91+
client().admin()
92+
.cluster()
93+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
94+
.setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.exclude._name"))
95+
.get();
96+
assertThat(queries.sum(), greaterThan((long) threads.length));
97+
}
98+
99+
public void testRetryOnShardMovement() {
100+
internalCluster().ensureAtLeastNumDataNodes(2);
101+
102+
assertAcked(
103+
client().admin()
104+
.indices()
105+
.prepareCreate("index-1")
106+
.setSettings(
107+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
108+
)
109+
);
110+
assertAcked(
111+
client().admin()
112+
.indices()
113+
.prepareCreate("index-2")
114+
.setSettings(
115+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
116+
)
117+
);
118+
client().prepareBulk("index-1")
119+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
120+
.add(new IndexRequest().source("key", "value-1"))
121+
.get();
122+
client().prepareBulk("index-2")
123+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
124+
.add(new IndexRequest().source("key", "value-2"))
125+
.get();
126+
127+
var shouldMove = new AtomicBoolean(true);
128+
129+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
130+
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
131+
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
132+
(handler, request, channel, task) -> {
133+
// move index shard
134+
if (shouldMove.compareAndSet(true, false)) {
135+
var currentShardNodeId = clusterService().state()
136+
.routingTable()
137+
.index("index-1")
138+
.shard(0)
139+
.primaryShard()
140+
.currentNodeId();
141+
assertAcked(
142+
client().admin()
143+
.indices()
144+
.prepareUpdateSettings("index-1")
145+
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", currentShardNodeId))
146+
);
147+
ensureGreen("index-1");
148+
}
149+
// execute data node request
150+
handler.messageReceived(request, channel, task);
151+
}
152+
);
153+
}
154+
155+
try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) {
156+
assertThat(getValuesList(resp), hasSize(2));
157+
}
158+
}
159+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,14 @@ public ComputeService(
118118
this.enrichLookupService = enrichLookupService;
119119
this.lookupFromIndexService = lookupFromIndexService;
120120
this.clusterService = clusterService;
121-
this.dataNodeComputeHandler = new DataNodeComputeHandler(this, searchService, transportService, exchangeService, esqlExecutor);
121+
this.dataNodeComputeHandler = new DataNodeComputeHandler(
122+
this,
123+
clusterService,
124+
searchService,
125+
transportService,
126+
exchangeService,
127+
esqlExecutor
128+
);
122129
this.clusterComputeHandler = new ClusterComputeHandler(
123130
this,
124131
exchangeService,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.support.ChannelActionListener;
1717
import org.elasticsearch.action.support.RefCountingRunnable;
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.compute.operator.DriverProfile;
2021
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2122
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
@@ -65,6 +66,7 @@
6566
*/
6667
final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRequest> {
6768
private final ComputeService computeService;
69+
private final ClusterService clusterService;
6870
private final SearchService searchService;
6971
private final TransportService transportService;
7072
private final ExchangeService exchangeService;
@@ -73,12 +75,14 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
7375

7476
DataNodeComputeHandler(
7577
ComputeService computeService,
78+
ClusterService clusterService,
7679
SearchService searchService,
7780
TransportService transportService,
7881
ExchangeService exchangeService,
7982
Executor esqlExecutor
8083
) {
8184
this.computeService = computeService;
85+
this.clusterService = clusterService;
8286
this.searchService = searchService;
8387
this.transportService = transportService;
8488
this.exchangeService = exchangeService;
@@ -102,12 +106,16 @@ void startComputeOnDataNodes(
102106
Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
103107

104108
new DataNodeRequestSender(
109+
clusterService,
105110
transportService,
106111
esqlExecutor,
107-
clusterAlias,
108112
parentTask,
113+
originalIndices,
114+
PlannerUtils.canMatchFilter(dataNodePlan),
115+
clusterAlias,
109116
configuration.allowPartialResults(),
110-
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
117+
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster,
118+
configuration.pragmas().unavailableShardResolutionAttempts()
111119
) {
112120
@Override
113121
protected void sendRequest(
@@ -199,10 +207,7 @@ protected void sendRequest(
199207
);
200208
}
201209
}.startComputeOnDataNodes(
202-
clusterAlias,
203210
concreteIndices,
204-
originalIndices,
205-
PlannerUtils.canMatchFilter(dataNodePlan),
206211
runOnTaskFailure,
207212
ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close)
208213
);

0 commit comments

Comments
 (0)