Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126653
summary: Retry shard movements during ESQL query
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;

public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

public void testSearchWhileRelocating() throws InterruptedException {
internalCluster().ensureAtLeastNumDataNodes(3);
var primaries = randomIntBetween(1, 10);
var replicas = randomIntBetween(0, 1);

indicesAdmin().prepareCreate("index-1").setSettings(indexSettings(primaries, replicas)).get();

var docs = randomIntBetween(10, 100);
var bulk = client().prepareBulk("index-1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < docs; i++) {
bulk.add(new IndexRequest().source("key", "value-1"));
}
bulk.get();

// start background searches
var stopped = new AtomicBoolean(false);
var queries = new LongAdder();
var threads = new Thread[randomIntBetween(1, 5)];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (stopped.get() == false) {
try (EsqlQueryResponse resp = run("FROM index-1")) {
assertThat(getValuesList(resp), hasSize(docs));
}
queries.increment();
}
});
}
for (Thread thread : threads) {
thread.start();
}

// start shard movements
var rounds = randomIntBetween(1, 10);
var names = internalCluster().getNodeNames();
for (int i = 0; i < rounds; i++) {
for (String name : names) {
client().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", name))
.get();
ensureGreen("index-1");
Thread.yield();
}
}

stopped.set(true);
for (Thread thread : threads) {
thread.join(10_000);
}

client().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.exclude._name"))
.get();
assertThat(queries.sum(), greaterThan((long) threads.length));
}

public void testRetryOnShardMovement() {
internalCluster().ensureAtLeastNumDataNodes(2);

assertAcked(
client().admin()
.indices()
.prepareCreate("index-1")
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);
assertAcked(
client().admin()
.indices()
.prepareCreate("index-2")
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);
client().prepareBulk("index-1")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("key", "value-1"))
.get();
client().prepareBulk("index-2")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("key", "value-2"))
.get();

var shouldMove = new AtomicBoolean(true);

for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
(handler, request, channel, task) -> {
// move index shard
if (shouldMove.compareAndSet(true, false)) {
var currentShardNodeId = clusterService().state()
.routingTable()
.index("index-1")
.shard(0)
.primaryShard()
.currentNodeId();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("index-1")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", currentShardNodeId))
);
ensureGreen("index-1");
}
// execute data node request
handler.messageReceived(request, channel, task);
}
);
}

try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) {
assertThat(getValuesList(resp), hasSize(2));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,14 @@ public ComputeService(
this.enrichLookupService = enrichLookupService;
this.lookupFromIndexService = lookupFromIndexService;
this.clusterService = clusterService;
this.dataNodeComputeHandler = new DataNodeComputeHandler(this, searchService, transportService, exchangeService, esqlExecutor);
this.dataNodeComputeHandler = new DataNodeComputeHandler(
this,
clusterService,
searchService,
transportService,
exchangeService,
esqlExecutor
);
this.clusterComputeHandler = new ClusterComputeHandler(
this,
exchangeService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
Expand Down Expand Up @@ -65,6 +66,7 @@
*/
final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRequest> {
private final ComputeService computeService;
private final ClusterService clusterService;
private final SearchService searchService;
private final TransportService transportService;
private final ExchangeService exchangeService;
Expand All @@ -73,12 +75,14 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe

DataNodeComputeHandler(
ComputeService computeService,
ClusterService clusterService,
SearchService searchService,
TransportService transportService,
ExchangeService exchangeService,
Executor esqlExecutor
) {
this.computeService = computeService;
this.clusterService = clusterService;
this.searchService = searchService;
this.transportService = transportService;
this.exchangeService = exchangeService;
Expand All @@ -102,12 +106,16 @@ void startComputeOnDataNodes(
Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);

new DataNodeRequestSender(
clusterService,
transportService,
esqlExecutor,
clusterAlias,
parentTask,
originalIndices,
PlannerUtils.canMatchFilter(dataNodePlan),
clusterAlias,
configuration.allowPartialResults(),
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster,
configuration.pragmas().unavailableShardResolutionAttempts()
) {
@Override
protected void sendRequest(
Expand Down Expand Up @@ -199,10 +207,7 @@ protected void sendRequest(
);
}
}.startComputeOnDataNodes(
clusterAlias,
concreteIndices,
originalIndices,
PlannerUtils.canMatchFilter(dataNodePlan),
runOnTaskFailure,
ActionListener.runAfter(outListener, exchangeSource.addEmptySink()::close)
);
Expand Down
Loading