From e2a3bf13e395b0112472c1a5dae28eb691ba0cfa Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 28 Jan 2025 10:37:50 -0800 Subject: [PATCH 01/10] Allow partial results in ES|QL --- .../org/elasticsearch/TransportVersions.java | 1 + .../xpack/esql/ConfigurationTestUtils.java | 3 +- .../xpack/esql/EsqlTestUtils.java | 3 +- .../esql/action/EsqlActionBreakerIT.java | 12 +++- .../xpack/esql/action/EsqlDisruptionIT.java | 21 +++++- .../xpack/esql/action/EsqlNodeFailureIT.java | 70 ++++++++++++++---- .../xpack/esql/action/EsqlCapabilities.java | 7 +- .../xpack/esql/action/EsqlExecutionInfo.java | 3 + .../xpack/esql/action/EsqlQueryRequest.java | 9 +++ .../xpack/esql/action/EsqlQueryResponse.java | 4 ++ .../xpack/esql/action/RequestXContent.java | 2 + .../xpack/esql/plugin/ComputeService.java | 20 +++--- .../esql/plugin/DataNodeComputeHandler.java | 26 +++++-- .../esql/plugin/DataNodeRequestSender.java | 30 ++++---- .../esql/plugin/RemoteListenerGroup.java | 5 +- .../esql/plugin/TransportEsqlQueryAction.java | 3 +- .../xpack/esql/session/Configuration.java | 35 ++++++++- ...AbstractConfigurationFunctionTestCase.java | 3 +- .../function/scalar/string/ToLowerTests.java | 3 +- .../function/scalar/string/ToUpperTests.java | 3 +- .../xpack/esql/planner/EvalMapperTests.java | 3 +- .../planner/LocalExecutionPlannerTests.java | 3 +- .../plugin/DataNodeRequestSenderTests.java | 71 +++++++++++++++---- .../ConfigurationSerializationTests.java | 3 +- 24 files changed, 269 insertions(+), 74 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index e4c83dc50fb41..592367b03ca10 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -189,6 +189,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_PROFILE_ASYNC_NANOS = def(9_007_00_0); public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00); public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00); + public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_010_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java index 39e79b33327a9..c7f51692e33e4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java @@ -71,7 +71,8 @@ public static Configuration randomConfiguration(String query, Map start disruption scheme [{}]", disruptionScheme); disruptionScheme.startDisrupting(); logger.info("--> executing esql query with disruption {} ", request.query()); + if (randomBoolean()) { + request.allowPartialResults(randomBoolean()); + } ActionFuture future = client().execute(EsqlQueryAction.INSTANCE, request); try { - return future.actionGet(2, TimeUnit.MINUTES); + var resp = future.actionGet(2, TimeUnit.MINUTES); + if (resp.isPartial() == false) { + return resp; + } + try (resp) { + assertTrue(request.allowPartialResults()); + } } catch (Exception ignored) { } finally { clearDisruption(); } try { - return future.actionGet(2, TimeUnit.MINUTES); + var resp = future.actionGet(2, TimeUnit.MINUTES); + if (resp.isPartial() == false) { + return resp; + } + try (resp) { + assertTrue(request.allowPartialResults()); + } } catch (Exception e) { logger.info( "running tasks: {}", @@ -113,8 +128,8 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) { ensureBlocksReleased(); logger.info("--> failed to execute esql query with disruption; retrying...", e); EsqlTestUtils.assertEsqlFailure(e); - return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES); } + return super.run(request); } private ServiceDisruptionScheme addRandomDisruptionScheme() { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java index 1118121b0becb..d14de89430589 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java @@ -16,13 +16,17 @@ import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.EsqlTestUtils; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.lessThanOrEqualTo; /** * Make sure the failures on the data node come back as failures over the wire. @@ -48,10 +52,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return settings; } - /** - * Use a runtime field that fails when loading field values to fail the entire query. - */ - public void testFailureLoadingFields() throws IOException { + public Set populateIndices() throws Exception { XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); mapping.startObject("runtime"); { @@ -63,17 +64,62 @@ public void testFailureLoadingFields() throws IOException { mapping.endObject(); } mapping.endObject(); - client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get(); - - int docCount = 50; - List docs = new ArrayList<>(docCount); - for (int d = 0; d < docCount; d++) { - docs.add(client().prepareIndex("ok").setSource("foo", d)); + client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get(); + int okCount = between(1, 50); + Set okIds = new HashSet<>(); + List docs = new ArrayList<>(okCount); + for (int d = 0; d < okCount; d++) { + String id = "ok-" + d; + okIds.add(id); + docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d)); + } + int failCount = between(1, 50); + for (int d = 0; d < failCount; d++) { + docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d)); } - docs.add(client().prepareIndex("fail").setSource("foo", 0)); indexRandom(true, docs); + return okIds; + } + /** + * Use a runtime field that fails when loading field values to fail the entire query. + */ + public void testFailureLoadingFields() throws Exception { + populateIndices(); IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close()); assertThat(e.getMessage(), equalTo("Accessing failing field")); } + + public void testPartialResults() throws Exception { + Set okIds = populateIndices(); + { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM fail,ok | LIMIT 100"); + request.allowPartialResults(true); + request.pragmas(randomPragmas()); + try (EsqlQueryResponse resp = run(request)) { + assertTrue(resp.isPartial()); + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows.size(), lessThanOrEqualTo(okIds.size())); + } + } + { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100"); + request.allowPartialResults(true); + request.pragmas(randomPragmas()); + try (EsqlQueryResponse resp = run(request)) { + assertTrue(resp.isPartial()); + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows.size(), lessThanOrEqualTo(okIds.size())); + Set actualIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(2)); + String id = (String) row.getFirst(); + assertThat(id, in(okIds)); + assertTrue(actualIds.add(id)); + } + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 9d550ad328044..ad07e0a23e54c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -809,7 +809,12 @@ public enum Cap { * Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats * were refactored. */ - INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG); + INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG), + + /** + * Support partial_results + */ + SUPPORT_PARTIAL_RESULTS; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index e8c98322221c9..d27c30a621216 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -174,6 +174,9 @@ public TimeValue planningTookTime() { public void markEndQuery() { assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called"; overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); + if (isPartial == false) { + isPartial = clusterInfo.values().stream().anyMatch(c -> c.failedShards != null && c.failedShards > 0); + } } // for testing only - use markEndQuery in production code diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index 239f9e2696f88..ee557930d1c23 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -52,6 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E private boolean keepOnCompletion; private boolean onSnapshotBuild = Build.current().isSnapshot(); private boolean acceptedPragmaRisks = false; + private boolean allowPartialResults = false; /** * "Tables" provided in the request for use with things like {@code LOOKUP}. @@ -231,6 +232,14 @@ public Map> tables() { return tables; } + public boolean allowPartialResults() { + return allowPartialResults; + } + + public void allowPartialResults(boolean allowPartialResults) { + this.allowPartialResults = allowPartialResults; + } + @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { // Pass the query as the description diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 1a82bb9b2829d..a3e89716ee6aa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -181,6 +181,10 @@ public boolean isAsync() { return isRunning; } + public boolean isPartial() { + return executionInfo != null && executionInfo.isPartial(); + } + public EsqlExecutionInfo getExecutionInfo() { return executionInfo; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java index e77d7b41aaca6..a793f39e90ee3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java @@ -85,6 +85,7 @@ String fields() { static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout"); static final ParseField KEEP_ALIVE = new ParseField("keep_alive"); static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion"); + static final ParseField ALLOW_PARTIAL_RESULTS = new ParseField("allow_partial_results"); private static final ObjectParser SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest); private static final ObjectParser ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest); @@ -114,6 +115,7 @@ private static void objectParserCommon(ObjectParser parser) parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD); parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD); parser.declareField((p, r, c) -> new ParseTables(r, p).parseTables(), TABLES_FIELD, ObjectParser.ValueType.OBJECT); + parser.declareBoolean(EsqlQueryRequest::allowPartialResults, ALLOW_PARTIAL_RESULTS); } private static ObjectParser objectParserSync(Supplier supplier) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4279d0114130d..70b168ab81675 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -209,7 +209,7 @@ public void execute( transportService.getThreadPool(), cancelQueryOnFailure, computeListener.acquireCompute().delegateFailure((l, profiles) -> { - if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) { + if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) { var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos()); var status = localClusterWasInterrupted.get() ? EsqlExecutionInfo.Cluster.Status.PARTIAL @@ -252,16 +252,14 @@ public void execute( cancelQueryOnFailure, localListener.acquireCompute().map(r -> { localClusterWasInterrupted.set(execInfo.isPartial()); - if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) { - execInfo.swapCluster( - LOCAL_CLUSTER, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards()) - .setSuccessfulShards(r.getSuccessfulShards()) - .setSkippedShards(r.getSkippedShards()) - .setFailedShards(r.getFailedShards()) - .build() - ); - } + execInfo.swapCluster( + LOCAL_CLUSTER, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards()) + .setSuccessfulShards(r.getSuccessfulShards()) + .setSkippedShards(r.getSkippedShards()) + .setFailedShards(r.getFailedShards()) + .build() + ); return r.getProfiles(); }) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index ee5b192bf3285..45048c4a142f1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -97,7 +97,8 @@ void startComputeOnDataNodes( Runnable runOnTaskFailure, ActionListener outListener ) { - DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask) { + final boolean allowPartialResults = configuration.allowPartialResults(); + DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask, allowPartialResults) { @Override protected void sendRequest( DiscoveryNode node, @@ -125,14 +126,28 @@ protected void sendRequest( queryPragmas.exchangeBufferSize(), esqlExecutor, listener.delegateFailureAndWrap((l, unused) -> { + final Runnable onGroupFailure; + final CancellableTask groupTask; + if (allowPartialResults) { + groupTask = RemoteListenerGroup.createGroupTask( + transportService, + parentTask, + () -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]" + ); + onGroupFailure = computeService.cancelQueryOnFailure(groupTask); + l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); + } else { + groupTask = parentTask; + onGroupFailure = runOnTaskFailure; + } final AtomicReference nodeResponseRef = new AtomicReference<>(); try ( - var computeListener = new ComputeListener(threadPool, runOnTaskFailure, l.map(ignored -> nodeResponseRef.get())) + var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get())) ) { - final var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, connection); + final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection); exchangeSource.addRemoteSink( remoteSink, - true, + allowPartialResults == false, pagesFetched::incrementAndGet, queryPragmas.concurrentExchangeClients(), computeListener.acquireAvoid() @@ -153,7 +168,7 @@ protected void sendRequest( connection, ComputeService.DATA_ACTION_NAME, dataNodeRequest, - parentTask, + groupTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(computeListener.acquireCompute().map(r -> { nodeResponseRef.set(r); @@ -238,6 +253,7 @@ public void onFailure(Exception e) { } onResponse(List.of()); } else { + // TODO: add these to fatal failures so we can continue processing other shards. try { exchangeService.finishSinkHandler(request.sessionId(), e); } finally { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 2d5b4169c0215..68c16e53ed50d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -55,6 +55,7 @@ abstract class DataNodeRequestSender { private final TransportService transportService; private final Executor esqlExecutor; private final CancellableTask rootTask; + private final boolean allowPartialResults; private final ReentrantLock sendingLock = new ReentrantLock(); private final Queue pendingShardIds = ConcurrentCollections.newQueue(); private final Map nodePermits = new HashMap<>(); @@ -62,10 +63,11 @@ abstract class DataNodeRequestSender { private final AtomicBoolean changed = new AtomicBoolean(); private boolean reportedFailure = false; // guarded by sendingLock - DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask) { + DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask, boolean allowPartialResults) { this.transportService = transportService; this.esqlExecutor = esqlExecutor; this.rootTask = rootTask; + this.allowPartialResults = allowPartialResults; } final void startComputeOnDataNodes( @@ -80,13 +82,14 @@ final void startComputeOnDataNodes( searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> { try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> { TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos); + final int failedShards = shardFailures.size(); return new ComputeResponse( profiles, took, targetShards.totalShards(), - targetShards.totalShards(), + targetShards.totalShards() - failedShards, targetShards.skippedShards(), - 0 + failedShards ); }))) { for (TargetShard shard : targetShards.shards.values()) { @@ -110,17 +113,16 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu break; } for (ShardId shardId : pendingShardIds) { - if (targetShards.getShard(shardId).remainingNodes.isEmpty()) { - shardFailures.compute( - shardId, - (k, v) -> new ShardFailure( - true, - v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure - ) - ); - } + if (targetShards.getShard(shardId).remainingNodes.isEmpty()) shardFailures.compute( + shardId, + (k, v) -> new ShardFailure( + true, + v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure + ) + ); } - if (reportedFailure || shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) { + if (reportedFailure + || (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) { reportedFailure = true; reportFailures(computeListener); } else { @@ -345,7 +347,7 @@ void searchShards( filter, null, null, - false, + true, // unavailable_shards will be handled by the sender clusterAlias ); transportService.sendChildRequest( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java index 9ef085257b87b..e5793cc51fad8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java @@ -50,7 +50,7 @@ class RemoteListenerGroup { this.taskManager = transportService.getTaskManager(); this.clusterAlias = clusterAlias; this.executionInfo = executionInfo; - groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); + groupTask = createGroupTask(transportService, rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); CountDown countDown = new CountDown(2); // The group is done when both the sink and the cluster request are done Runnable finishGroup = () -> { @@ -92,7 +92,8 @@ public ActionListener> getClusterRequestListener() { return clusterRequestListener; } - private CancellableTask createGroupTask(Task parentTask, Supplier description) { + public static CancellableTask createGroupTask(TransportService transportService, Task parentTask, Supplier description) { + final TaskManager taskManager = transportService.getTaskManager(); return (CancellableTask) taskManager.register( "transport", "esql_compute_group", diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 5b0dfa14014a6..5dd8b91cf30d5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -206,7 +206,8 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener> tables; private final long queryStartTimeNanos; @@ -65,7 +67,8 @@ public Configuration( String query, boolean profile, Map> tables, - long queryStartTimeNanos + long queryStartTimeNanos, + boolean allowPartialResults ) { this.zoneId = zi.normalized(); this.now = ZonedDateTime.now(Clock.tick(Clock.system(zoneId), Duration.ofNanos(1))); @@ -80,6 +83,7 @@ public Configuration( this.tables = tables; assert tables != null; this.queryStartTimeNanos = queryStartTimeNanos; + this.allowPartialResults = allowPartialResults; } public Configuration(BlockStreamInput in) throws IOException { @@ -107,6 +111,11 @@ public Configuration(BlockStreamInput in) throws IOException { } else { this.queryStartTimeNanos = -1; } + if (supportPartialResults(in.getTransportVersion())) { + this.allowPartialResults = in.readBoolean(); + } else { + this.allowPartialResults = false; + } } @Override @@ -131,6 +140,11 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeLong(queryStartTimeNanos); } + if (supportPartialResults(out.getTransportVersion())) { + out.writeBoolean(allowPartialResults); + } else if (allowPartialResults) { + throw new IllegalArgumentException("allow_partial_result is not supported in this version"); + } } public ZoneId zoneId() { @@ -206,6 +220,13 @@ public boolean profile() { return profile; } + /** + * Whether this request can return partial results instead of failing fast on failures + */ + public boolean allowPartialResults() { + return allowPartialResults; + } + private static void writeQuery(StreamOutput out, String query) throws IOException { if (query.length() > QUERY_COMPRESS_THRESHOLD_CHARS) { // compare on chars to avoid UTF-8 encoding unless actually required out.writeBoolean(true); @@ -244,7 +265,8 @@ public boolean equals(Object o) { && Objects.equals(locale, that.locale) && Objects.equals(that.query, query) && profile == that.profile - && tables.equals(that.tables); + && tables.equals(that.tables) + && allowPartialResults == that.allowPartialResults; } @Override @@ -260,7 +282,8 @@ public int hashCode() { locale, query, profile, - tables + tables, + allowPartialResults ); } @@ -282,6 +305,12 @@ public String toString() { + profile + ", tables=" + tables + + "allow_partial_result=" + + allowPartialResults + '}'; } + + public static boolean supportPartialResults(TransportVersion transportVersion) { + return transportVersion.onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS); + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java index a3a18d7a30b59..8cc9091dae90d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java @@ -43,7 +43,8 @@ static Configuration randomConfiguration() { StringUtils.EMPTY, randomBoolean(), Map.of(), - System.nanoTime() + System.nanoTime(), + randomBoolean() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java index f779dd038454d..a846ab7cb4aa7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java @@ -70,7 +70,8 @@ private Configuration randomLocaleConfig() { "", false, Map.of(), - System.nanoTime() + System.nanoTime(), + randomBoolean() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java index 3957c2e1fb2c0..da963f76b8dd6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java @@ -70,7 +70,8 @@ private Configuration randomLocaleConfig() { "", false, Map.of(), - System.nanoTime() + System.nanoTime(), + randomBoolean() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java index 6dc1eac2e5814..3ea56d6d15abd 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java @@ -78,7 +78,8 @@ public class EvalMapperTests extends ESTestCase { StringUtils.EMPTY, false, Map.of(), - System.nanoTime() + System.nanoTime(), + false ); @ParametersFactory(argumentFormatting = "%1$s") diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 7e5143d5a3ac0..d753045bee7cf 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -202,7 +202,8 @@ private Configuration config() { StringUtils.EMPTY, false, Map.of(), - System.nanoTime() + System.nanoTime(), + randomBoolean() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index e181d9bb34955..e34deb4d55f20 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -52,6 +52,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -88,7 +90,11 @@ public void shutdownThreadPool() throws Exception { } public void testEmpty() { - var future = sendRequests(List.of(), (node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent")); + var future = sendRequests( + List.of(), + randomBoolean(), + (node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent") + ); var resp = safeGet(future); assertThat(resp.totalShards, equalTo(0)); } @@ -101,7 +107,7 @@ public void testOnePass() { targetShard(shard4, node2, node3) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); var resp = new DataNodeComputeResponse(List.of(), Map.of()); runWithDelay(() -> listener.onResponse(resp)); @@ -112,12 +118,25 @@ public void testOnePass() { } public void testMissingShards() { - var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { - fail("expect no data-node request is sent when target shards are missing"); - }); - var error = expectThrows(NoShardAvailableActionException.class, future::actionGet); - assertThat(error.getMessage(), containsString("no shard copies found")); + { + var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); + var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { + fail("expect no data-node request is sent when target shards are missing"); + }); + var error = expectThrows(NoShardAvailableActionException.class, future::actionGet); + assertThat(error.getMessage(), containsString("no shard copies found")); + } + { + var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); + var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> { + assertThat(shard3, not(in(shardIds))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + }); + ComputeResponse resp = safeGet(future); + assertThat(resp.totalShards, equalTo(3)); + assertThat(resp.failedShards, equalTo(1)); + assertThat(resp.successfulShards, equalTo(2)); + } } public void testRetryThenSuccess() { @@ -129,7 +148,7 @@ public void testRetryThenSuccess() { targetShard(shard5, node1, node3, node2) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); Map failures = new HashMap<>(); if (node.equals(node1) && shardIds.contains(shard5)) { @@ -161,7 +180,7 @@ public void testRetryButFail() { targetShard(shard5, node1, node3, node2) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); Map failures = new HashMap<>(); if (shardIds.contains(shard5)) { @@ -187,7 +206,7 @@ public void testDoNotRetryOnRequestLevelFailure() { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1)); Queue sent = ConcurrentCollections.newQueue(); AtomicBoolean failed = new AtomicBoolean(); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); if (node1.equals(node) && failed.compareAndSet(false, true)) { runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); @@ -203,6 +222,28 @@ public void testDoNotRetryOnRequestLevelFailure() { assertThat(firstRound, equalTo(Map.of(node1, List.of(shard1, shard3), node2, List.of(shard2)))); } + public void testAllowPartialResults() { + var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1, node2)); + Queue sent = ConcurrentCollections.newQueue(); + AtomicBoolean failed = new AtomicBoolean(); + var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> { + sent.add(new NodeRequest(node, shardIds, aliasFilters)); + if (node1.equals(node) && failed.compareAndSet(false, true)) { + runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); + } else { + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + } + }); + ComputeResponse resp = safeGet(future); + // one round: {node-1, node-2} + assertThat(sent.size(), equalTo(2)); + var firstRound = groupRequests(sent, 2); + assertThat(firstRound, equalTo(Map.of(node1, List.of(shard1, shard3), node2, List.of(shard2)))); + assertThat(resp.totalShards, equalTo(3)); + assertThat(resp.failedShards, equalTo(2)); + assertThat(resp.successfulShards, equalTo(1)); + } + static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); } @@ -224,7 +265,11 @@ void runWithDelay(Runnable runnable) { } } - PlainActionFuture sendRequests(List shards, Sender sender) { + PlainActionFuture sendRequests( + List shards, + boolean allowPartialResults, + Sender sender + ) { PlainActionFuture future = new PlainActionFuture<>(); TransportService transportService = mock(TransportService.class); when(transportService.getThreadPool()).thenReturn(threadPool); @@ -236,7 +281,7 @@ PlainActionFuture sendRequests(List Date: Thu, 13 Feb 2025 13:34:33 -0800 Subject: [PATCH 02/10] Update docs/changelog/121942.yaml --- docs/changelog/121942.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/121942.yaml diff --git a/docs/changelog/121942.yaml b/docs/changelog/121942.yaml new file mode 100644 index 0000000000000..4973ebbb4f26c --- /dev/null +++ b/docs/changelog/121942.yaml @@ -0,0 +1,5 @@ +pr: 121942 +summary: Allow partial results in ES|QL +area: ES|QL +type: enhancement +issues: [] From 72bc1ea06679d162bb4bb5157b5425ddeaa7243a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Feb 2025 12:58:54 -0800 Subject: [PATCH 03/10] mark isPartial at the end --- .../xpack/esql/action/EsqlExecutionInfo.java | 9 ++++++++- .../xpack/esql/plugin/TransportEsqlQueryAction.java | 10 ---------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index d27c30a621216..16290b2987811 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -175,7 +175,14 @@ public void markEndQuery() { assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called"; overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); if (isPartial == false) { - isPartial = clusterInfo.values().stream().anyMatch(c -> c.failedShards != null && c.failedShards > 0); + // TODO: Mark individual clusters as partial if failed shards exist + isPartial = clusterInfo.values() + .stream() + .anyMatch( + c -> c.status == Cluster.Status.PARTIAL + || c.status == Cluster.Status.SKIPPED + || c.failedShards != null && c.failedShards > 0 + ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 5dd8b91cf30d5..baf351c27107c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -234,16 +234,6 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { - // If we had any skipped or partial clusters, the result is partial - if (executionInfo.getClusters() - .values() - .stream() - .anyMatch( - c -> c.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED - || c.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL - )) { - executionInfo.markAsPartial(); - } recordCCSTelemetry(task, executionInfo, request, null); listener.onResponse(toResponse(task, request, configuration, result)); }, ex -> { From ea784cdd54962a6201f74728f0a9f8ad291e6887 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Feb 2025 12:59:59 -0800 Subject: [PATCH 04/10] inline --- .../elasticsearch/xpack/esql/session/Configuration.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java index 0930f64c1d041..90dc61a6bfdaf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.session; -import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.compress.CompressorFactory; @@ -111,7 +110,7 @@ public Configuration(BlockStreamInput in) throws IOException { } else { this.queryStartTimeNanos = -1; } - if (supportPartialResults(in.getTransportVersion())) { + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS)) { this.allowPartialResults = in.readBoolean(); } else { this.allowPartialResults = false; @@ -140,7 +139,7 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeLong(queryStartTimeNanos); } - if (supportPartialResults(out.getTransportVersion())) { + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS)) { out.writeBoolean(allowPartialResults); } else if (allowPartialResults) { throw new IllegalArgumentException("allow_partial_result is not supported in this version"); @@ -310,7 +309,4 @@ public String toString() { + '}'; } - public static boolean supportPartialResults(TransportVersion transportVersion) { - return transportVersion.onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS); - } } From 8bb4cfd4624284cd70888c465f3ec8dfe3c3ddbf Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Feb 2025 13:12:44 -0800 Subject: [PATCH 05/10] Add comment --- .../esql/action/EsqlActionBreakerIT.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java index 626fa98086f0e..57f6b55d31845 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java @@ -95,11 +95,12 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu @Override protected EsqlQueryResponse run(EsqlQueryRequest request) { + if (randomBoolean()) { + request.allowPartialResults(randomBoolean()); + } + Exception failure = null; try { - if (randomBoolean()) { - request.allowPartialResults(randomBoolean()); - } - var resp = runWithBreaking(request); + final EsqlQueryResponse resp = runWithBreaking(request); if (resp.isPartial() == false) { return resp; } @@ -107,14 +108,18 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) { assertTrue(request.allowPartialResults()); } } catch (Exception e) { - try (EsqlQueryResponse resp = super.run(request)) { - assertThat(e, instanceOf(CircuitBreakingException.class)); - assertThat(ExceptionsHelper.status(e), equalTo(RestStatus.TOO_MANY_REQUESTS)); - resp.incRef(); - return resp; + failure = e; + } + // Re-run if the previous query failed or returned partial results + // Only check the previous failure if the second query succeeded + try (EsqlQueryResponse resp = super.run(request)) { + if (failure != null) { + assertThat(failure, instanceOf(CircuitBreakingException.class)); + assertThat(ExceptionsHelper.status(failure), equalTo(RestStatus.TOO_MANY_REQUESTS)); } + resp.incRef(); + return resp; } - return super.run(request); } /** From e1a7d06e65f65ca7e64e28448b06e04c8327956f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Feb 2025 13:13:50 -0800 Subject: [PATCH 06/10] revert --- .../xpack/esql/plugin/DataNodeRequestSender.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 68c16e53ed50d..a5b8c13f9a730 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -113,13 +113,15 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu break; } for (ShardId shardId : pendingShardIds) { - if (targetShards.getShard(shardId).remainingNodes.isEmpty()) shardFailures.compute( - shardId, - (k, v) -> new ShardFailure( - true, - v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure - ) - ); + if (targetShards.getShard(shardId).remainingNodes.isEmpty()) { + shardFailures.compute( + shardId, + (k, v) -> new ShardFailure( + true, + v == null ? new NoShardAvailableActionException(shardId, "no shard copies found") : v.failure + ) + ); + } } if (reportedFailure || (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) { From 926382977478b820944207d5963b3986e66226b5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Feb 2025 13:52:29 -0800 Subject: [PATCH 07/10] allow partial_result --- .../org/elasticsearch/xpack/esql/session/Configuration.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java index 90dc61a6bfdaf..b08eabda21cfc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java @@ -141,8 +141,6 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS)) { out.writeBoolean(allowPartialResults); - } else if (allowPartialResults) { - throw new IllegalArgumentException("allow_partial_result is not supported in this version"); } } From 508d3ebbe626c48127aa387038c9088ce0962f63 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Feb 2025 19:26:32 -0800 Subject: [PATCH 08/10] update `isPartial` --- .../xpack/esql/action/EsqlExecutionInfo.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index fd98533858e2b..d7b42e7cb9aca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -175,16 +175,6 @@ public TimeValue planningTookTime() { public void markEndQuery() { assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called"; overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); - if (isPartial == false) { - // TODO: Mark individual clusters as partial if failed shards exist - isPartial = clusterInfo.values() - .stream() - .anyMatch( - c -> c.status == Cluster.Status.PARTIAL - || c.status == Cluster.Status.SKIPPED - || c.failedShards != null && c.failedShards > 0 - ); - } } // for testing only - use markEndQuery in production code @@ -256,7 +246,15 @@ public Map getClusters() { * @return the new Cluster object */ public Cluster swapCluster(String clusterAlias, BiFunction remappingFunction) { - return clusterInfo.compute(clusterAlias, remappingFunction); + return clusterInfo.compute(clusterAlias, (unused, oldCluster) -> { + final Cluster newCluster = remappingFunction.apply(clusterAlias, oldCluster); + if (isPartial == false) { + isPartial = newCluster.getStatus() == Cluster.Status.PARTIAL + || newCluster.getStatus() == Cluster.Status.SKIPPED + || (newCluster.failedShards != null && newCluster.failedShards > 0); + } + return newCluster; + }); } @Override From 4ec55c1ca08c5760aafc3c4b8801e6d18d1d94ab Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Feb 2025 21:06:27 -0800 Subject: [PATCH 09/10] update `isPartial` --- .../xpack/esql/action/EsqlExecutionInfo.java | 24 +++++-------------- .../xpack/esql/plugin/ComputeService.java | 18 +++++++------- 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index d7b42e7cb9aca..76a790b25c8d2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -248,10 +248,8 @@ public Map getClusters() { public Cluster swapCluster(String clusterAlias, BiFunction remappingFunction) { return clusterInfo.compute(clusterAlias, (unused, oldCluster) -> { final Cluster newCluster = remappingFunction.apply(clusterAlias, oldCluster); - if (isPartial == false) { - isPartial = newCluster.getStatus() == Cluster.Status.PARTIAL - || newCluster.getStatus() == Cluster.Status.SKIPPED - || (newCluster.failedShards != null && newCluster.failedShards > 0); + if (newCluster != null && isPartial == false) { + isPartial = newCluster.isPartial(); } return newCluster; }); @@ -313,13 +311,6 @@ public boolean isPartial() { return isPartial; } - /** - * Mark the query as having partial results. - */ - public void markAsPartial() { - isPartial = true; - } - public void markAsStopped() { isStopped = true; } @@ -328,13 +319,6 @@ public boolean isStopped() { return isStopped; } - /** - * Mark this cluster as having partial results. - */ - public void markClusterAsPartial(String clusterAlias) { - swapCluster(clusterAlias, (k, v) -> new Cluster.Builder(v).setStatus(Cluster.Status.PARTIAL).build()); - } - /** * Represents the search metadata about a particular cluster involved in a cross-cluster search. * The Cluster object can represent either the local cluster or a remote cluster. @@ -626,6 +610,10 @@ public List getFailures() { return failures; } + boolean isPartial() { + return status == Status.PARTIAL || status == Status.SKIPPED || (failedShards != null && failedShards > 0); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 5ca91341e0b5a..a9a3be7ecab1c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -252,16 +252,14 @@ public void execute( cancelQueryOnFailure, localListener.acquireCompute().map(r -> { localClusterWasInterrupted.set(execInfo.isStopped()); - if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) { - execInfo.swapCluster( - LOCAL_CLUSTER, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards()) - .setSuccessfulShards(r.getSuccessfulShards()) - .setSkippedShards(r.getSkippedShards()) - .setFailedShards(r.getFailedShards()) - .build() - ); - } + execInfo.swapCluster( + LOCAL_CLUSTER, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards()) + .setSuccessfulShards(r.getSuccessfulShards()) + .setSkippedShards(r.getSkippedShards()) + .setFailedShards(r.getFailedShards()) + .build() + ); return r.getProfiles(); }) ); From d2bc582b9f40de629d5c375b1bbcd06d2a9c3ed1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 14 Feb 2025 21:46:46 -0800 Subject: [PATCH 10/10] disruption --- .../xpack/esql/action/EsqlDisruptionIT.java | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java index 2247655e14469..f26717c5674e1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java @@ -87,48 +87,53 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) { request.allowPartialResults(randomBoolean()); } ActionFuture future = client().execute(EsqlQueryAction.INSTANCE, request); + EsqlQueryResponse resp = null; try { - var resp = future.actionGet(2, TimeUnit.MINUTES); + resp = future.actionGet(2, TimeUnit.MINUTES); if (resp.isPartial() == false) { return resp; } - try (resp) { - assertTrue(request.allowPartialResults()); - } } catch (Exception ignored) { } finally { clearDisruption(); } - try { - var resp = future.actionGet(2, TimeUnit.MINUTES); + // wait for the response after clear disruption + if (resp == null) { + try { + resp = future.actionGet(2, TimeUnit.MINUTES); + } catch (Exception e) { + logger.info( + "running tasks: {}", + client().admin() + .cluster() + .prepareListTasks() + .get() + .getTasks() + .stream() + .filter( + // Skip the tasks we that'd get in the way while debugging + t -> false == t.action().contains(TransportListTasksAction.TYPE.name()) + && false == t.action().contains(HealthNode.TASK_NAME) + ) + .toList() + ); + assertTrue("request must be failed or completed after clearing disruption", future.isDone()); + ensureBlocksReleased(); + logger.info("--> failed to execute esql query with disruption; retrying...", e); + EsqlTestUtils.assertEsqlFailure(e); + } + } + // use the response if it's not partial + if (resp != null) { if (resp.isPartial() == false) { return resp; } - try (resp) { + try (var ignored = resp) { assertTrue(request.allowPartialResults()); } - } catch (Exception e) { - logger.info( - "running tasks: {}", - client().admin() - .cluster() - .prepareListTasks() - .get() - .getTasks() - .stream() - .filter( - // Skip the tasks we that'd get in the way while debugging - t -> false == t.action().contains(TransportListTasksAction.TYPE.name()) - && false == t.action().contains(HealthNode.TASK_NAME) - ) - .toList() - ); - assertTrue("request must be failed or completed after clearing disruption", future.isDone()); - ensureBlocksReleased(); - logger.info("--> failed to execute esql query with disruption; retrying...", e); - EsqlTestUtils.assertEsqlFailure(e); } + // re-run the query return super.run(request); }