From d64927d4f5e5cd5ec296eb6c0000681602dff08f Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 22 Apr 2025 13:15:15 +0200 Subject: [PATCH 1/2] Retry shard movements during ESQL query --- .../esql/plugin/DataNodeRequestSenderIT.java | 159 +++++++++++++ .../xpack/esql/plugin/ComputeService.java | 9 +- .../esql/plugin/DataNodeComputeHandler.java | 15 +- .../esql/plugin/DataNodeRequestSender.java | 133 +++++++---- .../xpack/esql/plugin/QueryPragmas.java | 11 + .../plugin/DataNodeRequestSenderTests.java | 211 ++++++++++++++---- 6 files changed, 440 insertions(+), 98 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderIT.java diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderIT.java new file mode 100644 index 0000000000000..1e22d2c69c881 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderIT.java @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); + } + + public void testSearchWhileRelocating() throws InterruptedException { + internalCluster().ensureAtLeastNumDataNodes(3); + var primaries = randomIntBetween(1, 10); + var replicas = randomIntBetween(0, 1); + + indicesAdmin().prepareCreate("index-1").setSettings(indexSettings(primaries, replicas)).get(); + + var docs = randomIntBetween(10, 100); + var bulk = client().prepareBulk("index-1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < docs; i++) { + bulk.add(new IndexRequest().source("key", "value-1")); + } + bulk.get(); + + // start background searches + var stopped = new AtomicBoolean(false); + var queries = new LongAdder(); + var threads = new Thread[randomIntBetween(1, 5)]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + while (stopped.get() == false) { + try (EsqlQueryResponse resp = run("FROM index-1")) { + assertThat(getValuesList(resp), hasSize(docs)); + } + queries.increment(); + } + }); + } + for (Thread thread : threads) { + thread.start(); + } + + // start shard movements + var rounds = randomIntBetween(1, 10); + var names = internalCluster().getNodeNames(); + for (int i = 0; i < rounds; i++) { + for (String name : names) { + client().admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", name)) + .get(); + ensureGreen("index-1"); + Thread.yield(); + } + } + + stopped.set(true); + for (Thread thread : threads) { + thread.join(10_000); + } + + client().admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.exclude._name")) + .get(); + assertThat(queries.sum(), greaterThan((long) threads.length)); + } + + public void testRetryOnShardMovement() { + internalCluster().ensureAtLeastNumDataNodes(2); + + assertAcked( + client().admin() + .indices() + .prepareCreate("index-1") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + assertAcked( + client().admin() + .indices() + .prepareCreate("index-2") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareBulk("index-1") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .add(new IndexRequest().source("key", "value-1")) + .get(); + client().prepareBulk("index-2") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .add(new IndexRequest().source("key", "value-2")) + .get(); + + var shouldMove = new AtomicBoolean(true); + + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).addRequestHandlingBehavior( + ExchangeService.OPEN_EXCHANGE_ACTION_NAME, + (handler, request, channel, task) -> { + // move index shard + if (shouldMove.compareAndSet(true, false)) { + var currentShardNodeId = clusterService().state() + .routingTable() + .index("index-1") + .shard(0) + .primaryShard() + .currentNodeId(); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings("index-1") + .setSettings(Settings.builder().put("index.routing.allocation.exclude._id", currentShardNodeId)) + ); + ensureGreen("index-1"); + } + // execute data node request + handler.messageReceived(request, channel, task); + } + ); + } + + try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) { + assertThat(getValuesList(resp), hasSize(2)); + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4b2fab55b9f32..2ae6c80980040 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -115,7 +115,14 @@ public ComputeService( this.enrichLookupService = enrichLookupService; this.lookupFromIndexService = lookupFromIndexService; this.clusterService = clusterService; - this.dataNodeComputeHandler = new DataNodeComputeHandler(this, searchService, transportService, exchangeService, esqlExecutor); + this.dataNodeComputeHandler = new DataNodeComputeHandler( + this, + clusterService, + searchService, + transportService, + exchangeService, + esqlExecutor + ); this.clusterComputeHandler = new ClusterComputeHandler( this, exchangeService, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 78bca3e81ef9b..0d990ced93b38 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.compute.operator.exchange.ExchangeSink; @@ -65,6 +66,7 @@ */ final class DataNodeComputeHandler implements TransportRequestHandler { private final ComputeService computeService; + private final ClusterService clusterService; private final SearchService searchService; private final TransportService transportService; private final ExchangeService exchangeService; @@ -73,12 +75,14 @@ final class DataNodeComputeHandler implements TransportRequestHandler 0 ? new Semaphore(concurrentRequests) : null; + this.remainingUnavailableShardResolutionAttempts = new AtomicInteger( + unavailableShardResolutionAttempts >= 0 ? unavailableShardResolutionAttempts : Integer.MAX_VALUE + ); } - final void startComputeOnDataNodes( - String clusterAlias, - Set concreteIndices, - OriginalIndices originalIndices, - QueryBuilder requestFilter, - Runnable runOnTaskFailure, - ActionListener listener - ) { + final void startComputeOnDataNodes(Set concreteIndices, Runnable runOnTaskFailure, ActionListener listener) { final long startTimeInNanos = System.nanoTime(); - searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> { - try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> { - return new ComputeResponse( - profiles, - TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos), - targetShards.totalShards(), - targetShards.totalShards() - shardFailures.size() - skippedShards.get(), - targetShards.skippedShards() + skippedShards.get(), - shardFailures.size(), - selectFailures() - ); - }))) { - for (TargetShard shard : targetShards.shards.values()) { - for (DiscoveryNode node : shard.remainingNodes) { - nodePermits.putIfAbsent(node, new Semaphore(1)); - } - } + searchShards(concreteIndices, ActionListener.wrap(targetShards -> { + try ( + var computeListener = new ComputeListener( + transportService.getThreadPool(), + runOnTaskFailure, + listener.map( + profiles -> new ComputeResponse( + profiles, + timeValueNanos(System.nanoTime() - startTimeInNanos), + targetShards.totalShards(), + targetShards.totalShards() - shardFailures.size() - skippedShards.get(), + targetShards.skippedShards() + skippedShards.get(), + shardFailures.size(), + selectFailures() + ) + ) + ) + ) { pendingShardIds.addAll(targetShards.shards.keySet()); trySendingRequestsForPendingShards(targetShards, computeListener); } @@ -127,6 +141,20 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu if (changed.compareAndSet(true, false) == false) { break; } + var pendingRetries = new HashSet(); + for (ShardId shardId : pendingShardIds) { + if (targetShards.getShard(shardId).remainingNodes.isEmpty()) { + var failure = shardFailures.get(shardId); + if (failure != null && failure.fatal == false && failure.failure instanceof NoShardAvailableActionException) { + pendingRetries.add(shardId); + } + } + } + if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) { + for (var entry : resolveShards(pendingRetries).entrySet()) { + targetShards.getShard(entry.getKey()).remainingNodes.addAll(entry.getValue()); + } + } for (ShardId shardId : pendingShardIds) { if (targetShards.getShard(shardId).remainingNodes.isEmpty()) { shardFailures.compute( @@ -277,7 +305,7 @@ private void trackShardLevelFailure(ShardId shardId, boolean fatal, Exception or } /** - * Result from {@link #searchShards(Task, String, QueryBuilder, Set, OriginalIndices, ActionListener)} where can_match is performed to + * Result from {@link #searchShards(Set, ActionListener)} where can_match is performed to * determine what shards can be skipped and which target nodes are needed for running the ES|QL query * * @param shards List of target shards to perform the ES|QL query on @@ -329,7 +357,7 @@ private List selectNodeRequests(TargetShards targetShards) { } if (concurrentRequests == null || concurrentRequests.tryAcquire()) { - if (nodePermits.get(node).tryAcquire()) { + if (nodePermits.computeIfAbsent(node, n -> new Semaphore(1)).tryAcquire()) { pendingRequest = new ArrayList<>(); pendingRequest.add(shard.shardId); nodeToShardIds.put(node, pendingRequest); @@ -367,22 +395,15 @@ private List selectNodeRequests(TargetShards targetShards) { * Ideally, the search_shards API should be called before the field-caps API; however, this can lead * to a situation where the column structure (i.e., matched data types) differs depending on the query. */ - void searchShards( - Task parentTask, - String clusterAlias, - QueryBuilder filter, - Set concreteIndices, - OriginalIndices originalIndices, - ActionListener listener - ) { + void searchShards(Set concreteIndices, ActionListener listener) { ActionListener searchShardsListener = listener.map(resp -> { - Map nodes = new HashMap<>(); + Map nodes = Maps.newHashMapWithExpectedSize(resp.getNodes().size()); for (DiscoveryNode node : resp.getNodes()) { nodes.put(node.getId(), node); } int totalShards = 0; int skippedShards = 0; - Map shards = new HashMap<>(); + Map shards = Maps.newHashMapWithExpectedSize(resp.getGroups().size()); for (SearchShardsGroup group : resp.getGroups()) { var shardId = group.shardId(); if (concreteIndices.contains(shardId.getIndexName()) == false) { @@ -402,10 +423,10 @@ void searchShards( } return new TargetShards(shards, totalShards, skippedShards); }); - SearchShardsRequest searchShardsRequest = new SearchShardsRequest( + var searchShardsRequest = new SearchShardsRequest( originalIndices.indices(), originalIndices.indicesOptions(), - filter, + requestFilter, null, null, true, // unavailable_shards will be handled by the sender @@ -415,9 +436,29 @@ void searchShards( transportService.getLocalNode(), EsqlSearchShardsAction.TYPE.name(), searchShardsRequest, - parentTask, + rootTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(searchShardsListener, SearchShardsResponse::new, esqlExecutor) ); } + + /** + * Attempts to resolve shards locations after they have been moved + */ + Map> resolveShards(Set shardIds) { + var state = clusterService.state(); + var nodes = Maps.>newMapWithExpectedSize(shardIds.size()); + for (var shardId : shardIds) { + nodes.put( + shardId, + state.routingTable() + .shardRoutingTable(shardId) + .allShards() + .filter(shard -> shard.active() && shard.isSearchable()) + .map(shard -> state.nodes().get(shard.currentNodeId())) + .toList() + ); + } + return nodes; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java index 414f8bdb38527..563da6a64ad2b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java @@ -60,6 +60,9 @@ public final class QueryPragmas implements Writeable { public static final Setting MAX_CONCURRENT_SHARDS_PER_NODE = // Setting.intSetting("max_concurrent_shards_per_node", 10, 1, 100); + public static final Setting UNAVAILABLE_SHARD_RESOLUTION_ATTEMPTS = // + Setting.intSetting("unavailable_shard_resolution_attempts", 10, -1); + public static final Setting NODE_LEVEL_REDUCTION = Setting.boolSetting("node_level_reduction", true); public static final Setting FOLD_LIMIT = Setting.memorySizeSetting("fold_limit", "5%"); @@ -140,6 +143,14 @@ public int maxConcurrentShardsPerNode() { return MAX_CONCURRENT_SHARDS_PER_NODE.get(settings); } + /** + * Amount of attempts moved shards could be retried. + * This setting is protecting query from endlessly chasing moving shards. + */ + public int unavailableShardResolutionAttempts() { + return UNAVAILABLE_SHARD_RESOLUTION_ATTEMPTS.get(settings); + } + /** * Returns true if each data node should perform a local reduction for sort, limit, topN, stats or false if the coordinator node * will perform the reduction. diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index c18eba5865d6e..461c11119a262 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -23,15 +23,15 @@ import org.elasticsearch.compute.test.ComputeTestCase; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeListener; import org.junit.After; import org.junit.Before; @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -53,6 +54,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequest; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -96,9 +98,9 @@ public void shutdownThreadPool() { public void testEmpty() { var future = sendRequests( - List.of(), randomBoolean(), -1, + List.of(), (node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent") ); var resp = safeGet(future); @@ -113,7 +115,7 @@ public void testOnePass() { targetShard(shard4, node2, node3) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); }); @@ -125,14 +127,14 @@ public void testOnePass() { public void testMissingShards() { { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); - var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(false, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { fail("expect no data-node request is sent when target shards are missing"); }); expectThrows(NoShardAvailableActionException.class, containsString("no shard copies found"), future::actionGet); } { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); - var future = sendRequests(targetShards, true, -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(true, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { assertThat(shard3, not(in(shardIds))); runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); }); @@ -156,7 +158,7 @@ public void testRetryThenSuccess() { targetShard(shard5, node1, node3, node2) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, randomBoolean(), -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(randomBoolean(), -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); Map failures = new HashMap<>(); if (node.equals(node1) && shardIds.contains(shard5)) { @@ -188,7 +190,7 @@ public void testRetryButFail() { targetShard(shard5, node1, node3, node2) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(false, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); Map failures = new HashMap<>(); if (shardIds.contains(shard5)) { @@ -214,7 +216,7 @@ public void testDoNotRetryOnRequestLevelFailure() { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1)); Queue sent = ConcurrentCollections.newQueue(); AtomicBoolean failed = new AtomicBoolean(); - var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(false, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); if (node1.equals(node) && failed.compareAndSet(false, true)) { runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); @@ -234,7 +236,7 @@ public void testAllowPartialResults() { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1, node2)); Queue sent = ConcurrentCollections.newQueue(); AtomicBoolean failed = new AtomicBoolean(); - var future = sendRequests(targetShards, true, -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(true, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); if (node1.equals(node) && failed.compareAndSet(false, true)) { runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); @@ -255,7 +257,7 @@ public void testAllowPartialResults() { public void testNonFatalErrorIsRetriedOnAnotherShard() { var targetShards = List.of(targetShard(shard1, node1, node2)); var sent = ConcurrentCollections.newQueue(); - var response = safeGet(sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { + var response = safeGet(sendRequests(false, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); if (Objects.equals(node1, node)) { runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false)); @@ -272,7 +274,7 @@ public void testNonFatalErrorIsRetriedOnAnotherShard() { public void testNonFatalFailedOnAllNodes() { var targetShards = List.of(targetShard(shard1, node1, node2)); var sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(false, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); runWithDelay(() -> listener.onFailure(new RuntimeException("test request level non fatal failure"), false)); }); @@ -283,7 +285,7 @@ public void testNonFatalFailedOnAllNodes() { public void testDoNotRetryCircuitBreakerException() { var targetShards = List.of(targetShard(shard1, node1, node2)); var sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, false, -1, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(false, -1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); runWithDelay(() -> listener.onFailure(new CircuitBreakingException("cbe", randomFrom(Durability.values())), false)); }); @@ -304,7 +306,7 @@ public void testLimitConcurrentNodes() { AtomicInteger maxConcurrentRequests = new AtomicInteger(0); AtomicInteger concurrentRequests = new AtomicInteger(0); var sent = ConcurrentCollections.newQueue(); - var response = safeGet(sendRequests(targetShards, randomBoolean(), concurrency, (node, shardIds, aliasFilters, listener) -> { + var response = safeGet(sendRequests(randomBoolean(), concurrency, targetShards, (node, shardIds, aliasFilters, listener) -> { concurrentRequests.incrementAndGet(); while (true) { @@ -338,7 +340,7 @@ public void testSkipNodes() { ); AtomicInteger processed = new AtomicInteger(0); - var response = safeGet(sendRequests(targetShards, randomBoolean(), 1, (node, shardIds, aliasFilters, listener) -> { + var response = safeGet(sendRequests(randomBoolean(), 1, targetShards, (node, shardIds, aliasFilters, listener) -> { runWithDelay(() -> { if (processed.incrementAndGet() == 1) { listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); @@ -357,7 +359,7 @@ public void testSkipRemovesPriorNonFatalErrors() { var targetShards = List.of(targetShard(shard1, node1, node2), targetShard(shard2, node3)); var sent = ConcurrentCollections.newQueue(); - var response = safeGet(sendRequests(targetShards, randomBoolean(), 1, (node, shardIds, aliasFilters, listener) -> { + var response = safeGet(sendRequests(randomBoolean(), 1, targetShards, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); runWithDelay(() -> { if (Objects.equals(node.getId(), node1.getId()) && shardIds.equals(List.of(shard1))) { @@ -376,6 +378,115 @@ public void testSkipRemovesPriorNonFatalErrors() { assertThat(response.failedShards, equalTo(0)); } + public void testRetryMovedShard() { + var attempt = new AtomicInteger(0); + var response = safeGet( + sendRequests(randomBoolean(), -1, List.of(targetShard(shard1, node1)), shardIds -> switch (attempt.incrementAndGet()) { + case 1 -> Map.of(shard1, List.of(node2)); + case 2 -> Map.of(shard1, List.of(node3)); + default -> Map.of(shard1, List.of(node4)); + }, + (node, shardIds, aliasFilters, listener) -> runWithDelay( + () -> listener.onResponse( + Objects.equals(node, node4) + ? new DataNodeComputeResponse(List.of(), Map.of()) + : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))) + ) + ) + ) + ); + assertThat(response.totalShards, equalTo(1)); + assertThat(response.successfulShards, equalTo(1)); + assertThat(response.skippedShards, equalTo(0)); + assertThat(response.failedShards, equalTo(0)); + assertThat(attempt.get(), equalTo(3)); + } + + public void testDoesNotRetryMovedShardIndefinitely() { + var attempt = new AtomicInteger(0); + var response = safeGet(sendRequests(true, -1, List.of(targetShard(shard1, node1)), shardIds -> { + attempt.incrementAndGet(); + return Map.of(shard1, List.of(node2)); + }, + (node, shardIds, aliasFilters, listener) -> runWithDelay( + () -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1)))) + ) + )); + assertThat(response.totalShards, equalTo(1)); + assertThat(response.successfulShards, equalTo(0)); + assertThat(response.skippedShards, equalTo(0)); + assertThat(response.failedShards, equalTo(1)); + assertThat(attempt.get(), equalTo(10)); + } + + public void testRetryOnlyMovedShards() { + var attempt = new AtomicInteger(0); + var resolvedShards = Collections.synchronizedSet(new HashSet<>()); + var response = safeGet( + sendRequests(randomBoolean(), -1, List.of(targetShard(shard1, node1, node3), targetShard(shard2, node2)), shardIds -> { + attempt.incrementAndGet(); + resolvedShards.addAll(shardIds); + return Map.of(shard2, List.of(node4)); + }, (node, shardIds, aliasFilters, listener) -> runWithDelay(() -> { + if (Objects.equals(node, node1)) { + // search is going to be retried from replica on node3 without shard resolution + listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1)))); + } else if (Objects.equals(node, node2)) { + // search is going to be retried after resolving new shard node since there are no replicas + listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of(shard2, new ShardNotFoundException(shard2)))); + } else { + listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())); + } + })) + ); + assertThat(response.totalShards, equalTo(2)); + assertThat(response.successfulShards, equalTo(2)); + assertThat(response.skippedShards, equalTo(0)); + assertThat(response.failedShards, equalTo(0)); + assertThat(attempt.get(), equalTo(1)); + assertThat("Must retry only affected shards", resolvedShards, contains(shard2)); + } + + public void testRetryUnassignedShardWithoutPartialResults() { + var attempt = new AtomicInteger(0); + var future = sendRequests(false, -1, List.of(targetShard(shard1, node1), targetShard(shard2, node2)), shardIds -> { + attempt.incrementAndGet(); + return Map.of(shard1, List.of()); + }, + (node, shardIds, aliasFilters, listener) -> runWithDelay( + () -> listener.onResponse( + Objects.equals(shardIds, List.of(shard2)) + ? new DataNodeComputeResponse(List.of(), Map.of()) + : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))) + ) + ) + + ); + expectThrows(NoShardAvailableActionException.class, containsString("no such shard"), future::actionGet); + } + + public void testRetryUnassignedShardWithPartialResults() { + var response = safeGet( + sendRequests( + true, + -1, + List.of(targetShard(shard1, node1), targetShard(shard2, node2)), + shardIds -> Map.of(shard1, List.of()), + (node, shardIds, aliasFilters, listener) -> runWithDelay( + () -> listener.onResponse( + Objects.equals(shardIds, List.of(shard2)) + ? new DataNodeComputeResponse(List.of(), Map.of()) + : new DataNodeComputeResponse(List.of(), Map.of(shard1, new ShardNotFoundException(shard1))) + ) + ) + ) + ); + assertThat(response.totalShards, equalTo(2)); + assertThat(response.successfulShards, equalTo(1)); + assertThat(response.skippedShards, equalTo(0)); + assertThat(response.failedShards, equalTo(1)); + } + static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); } @@ -398,9 +509,21 @@ void runWithDelay(Runnable runnable) { } PlainActionFuture sendRequests( + boolean allowPartialResults, + int concurrentRequests, List shards, + Sender sender + ) { + return sendRequests(allowPartialResults, concurrentRequests, shards, shardIds -> { + throw new AssertionError("No shard resolution is expected here"); + }, sender); + } + + PlainActionFuture sendRequests( boolean allowPartialResults, int concurrentRequests, + List shards, + Resolver resolver, Sender sender ) { PlainActionFuture future = new PlainActionFuture<>(); @@ -415,29 +538,33 @@ PlainActionFuture sendRequests( Collections.emptyMap() ); DataNodeRequestSender requestSender = new DataNodeRequestSender( + null, transportService, executor, - "", task, + new OriginalIndices(new String[0], SearchRequest.DEFAULT_INDICES_OPTIONS), + null, + "", allowPartialResults, - concurrentRequests + concurrentRequests, + 10 ) { @Override - void searchShards( - Task parentTask, - String clusterAlias, - QueryBuilder filter, - Set concreteIndices, - OriginalIndices originalIndices, - ActionListener listener - ) { - var targetShards = new TargetShards( - shards.stream().collect(Collectors.toMap(TargetShard::shardId, Function.identity())), - shards.size(), - 0 + void searchShards(Set concreteIndices, ActionListener listener) { + runWithDelay( + () -> listener.onResponse( + new TargetShards( + shards.stream().collect(Collectors.toMap(TargetShard::shardId, Function.identity())), + shards.size(), + 0 + ) + ) ); - assertSame(parentTask, task); - runWithDelay(() -> listener.onResponse(targetShards)); + } + + @Override + Map> resolveShards(Set shardIds) { + return resolver.resolve(shardIds); } @Override @@ -450,23 +577,15 @@ protected void sendRequest( sender.sendRequestToOneNode(node, shardIds, aliasFilters, listener); } }; - requestSender.startComputeOnDataNodes( - "", - Set.of(randomAlphaOfLength(10)), - new OriginalIndices(new String[0], SearchRequest.DEFAULT_INDICES_OPTIONS), - null, - () -> {}, - future - ); + requestSender.startComputeOnDataNodes(Set.of(randomAlphaOfLength(10)), () -> {}, future); return future; } + interface Resolver { + Map> resolve(Set shardIds); + } + interface Sender { - void sendRequestToOneNode( - DiscoveryNode node, - List shardIds, - Map aliasFilters, - DataNodeRequestSender.NodeListener listener - ); + void sendRequestToOneNode(DiscoveryNode node, List shardIds, Map aliasFilters, NodeListener listener); } } From dd2e98f500738453a07e3327f1211506e799001c Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 7 May 2025 11:17:33 +0200 Subject: [PATCH 2/2] add changelog --- docs/changelog/126653.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/126653.yaml diff --git a/docs/changelog/126653.yaml b/docs/changelog/126653.yaml new file mode 100644 index 0000000000000..1497aa7a40053 --- /dev/null +++ b/docs/changelog/126653.yaml @@ -0,0 +1,5 @@ +pr: 126653 +summary: Retry shard movements during ESQL query +area: ES|QL +type: enhancement +issues: []