diff --git a/docs/changelog/121942.yaml b/docs/changelog/121942.yaml new file mode 100644 index 0000000000000..4973ebbb4f26c --- /dev/null +++ b/docs/changelog/121942.yaml @@ -0,0 +1,5 @@ +pr: 121942 +summary: Allow partial results in ES|QL +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 2322d7b3acc05..57a24562344de 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -192,6 +192,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00); public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00); public static final TransportVersion SLM_UNHEALTHY_IF_NO_SNAPSHOT_WITHIN = def(9_010_0_00); + public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_011_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java index 39e79b33327a9..c7f51692e33e4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java @@ -71,7 +71,8 @@ public static Configuration randomConfiguration(String query, Map start disruption scheme [{}]", disruptionScheme); disruptionScheme.startDisrupting(); logger.info("--> executing esql query with disruption {} ", request.query()); + if (randomBoolean()) { + request.allowPartialResults(randomBoolean()); + } ActionFuture future = client().execute(EsqlQueryAction.INSTANCE, request); + EsqlQueryResponse resp = null; try { - return future.actionGet(2, TimeUnit.MINUTES); + resp = future.actionGet(2, TimeUnit.MINUTES); + if (resp.isPartial() == false) { + return resp; + } } catch (Exception ignored) { } finally { clearDisruption(); } - try { - return future.actionGet(2, TimeUnit.MINUTES); - } catch (Exception e) { - logger.info( - "running tasks: {}", - client().admin() - .cluster() - .prepareListTasks() - .get() - .getTasks() - .stream() - .filter( - // Skip the tasks we that'd get in the way while debugging - t -> false == t.action().contains(TransportListTasksAction.TYPE.name()) - && false == t.action().contains(HealthNode.TASK_NAME) - ) - .toList() - ); - assertTrue("request must be failed or completed after clearing disruption", future.isDone()); - ensureBlocksReleased(); - logger.info("--> failed to execute esql query with disruption; retrying...", e); - EsqlTestUtils.assertEsqlFailure(e); - return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES); + // wait for the response after clear disruption + if (resp == null) { + try { + resp = future.actionGet(2, TimeUnit.MINUTES); + } catch (Exception e) { + logger.info( + "running tasks: {}", + client().admin() + .cluster() + .prepareListTasks() + .get() + .getTasks() + .stream() + .filter( + // Skip the tasks we that'd get in the way while debugging + t -> false == t.action().contains(TransportListTasksAction.TYPE.name()) + && false == t.action().contains(HealthNode.TASK_NAME) + ) + .toList() + ); + assertTrue("request must be failed or completed after clearing disruption", future.isDone()); + ensureBlocksReleased(); + logger.info("--> failed to execute esql query with disruption; retrying...", e); + EsqlTestUtils.assertEsqlFailure(e); + } + } + // use the response if it's not partial + if (resp != null) { + if (resp.isPartial() == false) { + return resp; + } + try (var ignored = resp) { + assertTrue(request.allowPartialResults()); + } } + // re-run the query + return super.run(request); } private ServiceDisruptionScheme addRandomDisruptionScheme() { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java index 1118121b0becb..d14de89430589 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java @@ -16,13 +16,17 @@ import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.EsqlTestUtils; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.lessThanOrEqualTo; /** * Make sure the failures on the data node come back as failures over the wire. @@ -48,10 +52,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return settings; } - /** - * Use a runtime field that fails when loading field values to fail the entire query. - */ - public void testFailureLoadingFields() throws IOException { + public Set populateIndices() throws Exception { XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); mapping.startObject("runtime"); { @@ -63,17 +64,62 @@ public void testFailureLoadingFields() throws IOException { mapping.endObject(); } mapping.endObject(); - client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get(); - - int docCount = 50; - List docs = new ArrayList<>(docCount); - for (int d = 0; d < docCount; d++) { - docs.add(client().prepareIndex("ok").setSource("foo", d)); + client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get(); + int okCount = between(1, 50); + Set okIds = new HashSet<>(); + List docs = new ArrayList<>(okCount); + for (int d = 0; d < okCount; d++) { + String id = "ok-" + d; + okIds.add(id); + docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d)); + } + int failCount = between(1, 50); + for (int d = 0; d < failCount; d++) { + docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d)); } - docs.add(client().prepareIndex("fail").setSource("foo", 0)); indexRandom(true, docs); + return okIds; + } + /** + * Use a runtime field that fails when loading field values to fail the entire query. + */ + public void testFailureLoadingFields() throws Exception { + populateIndices(); IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close()); assertThat(e.getMessage(), equalTo("Accessing failing field")); } + + public void testPartialResults() throws Exception { + Set okIds = populateIndices(); + { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM fail,ok | LIMIT 100"); + request.allowPartialResults(true); + request.pragmas(randomPragmas()); + try (EsqlQueryResponse resp = run(request)) { + assertTrue(resp.isPartial()); + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows.size(), lessThanOrEqualTo(okIds.size())); + } + } + { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100"); + request.allowPartialResults(true); + request.pragmas(randomPragmas()); + try (EsqlQueryResponse resp = run(request)) { + assertTrue(resp.isPartial()); + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows.size(), lessThanOrEqualTo(okIds.size())); + Set actualIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(2)); + String id = (String) row.getFirst(); + assertThat(id, in(okIds)); + assertTrue(actualIds.add(id)); + } + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 57c8f033c8838..b6762a2563a5d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -819,7 +819,12 @@ public enum Cap { * Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats * were refactored. */ - INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG); + INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG), + + /** + * Support partial_results + */ + SUPPORT_PARTIAL_RESULTS; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index c97bd84a101cd..76a790b25c8d2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -246,7 +246,13 @@ public Map getClusters() { * @return the new Cluster object */ public Cluster swapCluster(String clusterAlias, BiFunction remappingFunction) { - return clusterInfo.compute(clusterAlias, remappingFunction); + return clusterInfo.compute(clusterAlias, (unused, oldCluster) -> { + final Cluster newCluster = remappingFunction.apply(clusterAlias, oldCluster); + if (newCluster != null && isPartial == false) { + isPartial = newCluster.isPartial(); + } + return newCluster; + }); } @Override @@ -305,13 +311,6 @@ public boolean isPartial() { return isPartial; } - /** - * Mark the query as having partial results. - */ - public void markAsPartial() { - isPartial = true; - } - public void markAsStopped() { isStopped = true; } @@ -320,13 +319,6 @@ public boolean isStopped() { return isStopped; } - /** - * Mark this cluster as having partial results. - */ - public void markClusterAsPartial(String clusterAlias) { - swapCluster(clusterAlias, (k, v) -> new Cluster.Builder(v).setStatus(Cluster.Status.PARTIAL).build()); - } - /** * Represents the search metadata about a particular cluster involved in a cross-cluster search. * The Cluster object can represent either the local cluster or a remote cluster. @@ -618,6 +610,10 @@ public List getFailures() { return failures; } + boolean isPartial() { + return status == Status.PARTIAL || status == Status.SKIPPED || (failedShards != null && failedShards > 0); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index 239f9e2696f88..ee557930d1c23 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -52,6 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E private boolean keepOnCompletion; private boolean onSnapshotBuild = Build.current().isSnapshot(); private boolean acceptedPragmaRisks = false; + private boolean allowPartialResults = false; /** * "Tables" provided in the request for use with things like {@code LOOKUP}. @@ -231,6 +232,14 @@ public Map> tables() { return tables; } + public boolean allowPartialResults() { + return allowPartialResults; + } + + public void allowPartialResults(boolean allowPartialResults) { + this.allowPartialResults = allowPartialResults; + } + @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { // Pass the query as the description diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 1a82bb9b2829d..a3e89716ee6aa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -181,6 +181,10 @@ public boolean isAsync() { return isRunning; } + public boolean isPartial() { + return executionInfo != null && executionInfo.isPartial(); + } + public EsqlExecutionInfo getExecutionInfo() { return executionInfo; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java index e77d7b41aaca6..a793f39e90ee3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java @@ -85,6 +85,7 @@ String fields() { static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout"); static final ParseField KEEP_ALIVE = new ParseField("keep_alive"); static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion"); + static final ParseField ALLOW_PARTIAL_RESULTS = new ParseField("allow_partial_results"); private static final ObjectParser SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest); private static final ObjectParser ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest); @@ -114,6 +115,7 @@ private static void objectParserCommon(ObjectParser parser) parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD); parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD); parser.declareField((p, r, c) -> new ParseTables(r, p).parseTables(), TABLES_FIELD, ObjectParser.ValueType.OBJECT); + parser.declareBoolean(EsqlQueryRequest::allowPartialResults, ALLOW_PARTIAL_RESULTS); } private static ObjectParser objectParserSync(Supplier supplier) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ad93025bd4f2f..a9a3be7ecab1c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -209,7 +209,7 @@ public void execute( transportService.getThreadPool(), cancelQueryOnFailure, computeListener.acquireCompute().delegateFailure((l, profiles) -> { - if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) { + if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) { var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos()); var status = localClusterWasInterrupted.get() ? EsqlExecutionInfo.Cluster.Status.PARTIAL @@ -252,16 +252,14 @@ public void execute( cancelQueryOnFailure, localListener.acquireCompute().map(r -> { localClusterWasInterrupted.set(execInfo.isStopped()); - if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) { - execInfo.swapCluster( - LOCAL_CLUSTER, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards()) - .setSuccessfulShards(r.getSuccessfulShards()) - .setSkippedShards(r.getSkippedShards()) - .setFailedShards(r.getFailedShards()) - .build() - ); - } + execInfo.swapCluster( + LOCAL_CLUSTER, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards()) + .setSuccessfulShards(r.getSuccessfulShards()) + .setSkippedShards(r.getSkippedShards()) + .setFailedShards(r.getFailedShards()) + .build() + ); return r.getProfiles(); }) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index ee5b192bf3285..45048c4a142f1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -97,7 +97,8 @@ void startComputeOnDataNodes( Runnable runOnTaskFailure, ActionListener outListener ) { - DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask) { + final boolean allowPartialResults = configuration.allowPartialResults(); + DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask, allowPartialResults) { @Override protected void sendRequest( DiscoveryNode node, @@ -125,14 +126,28 @@ protected void sendRequest( queryPragmas.exchangeBufferSize(), esqlExecutor, listener.delegateFailureAndWrap((l, unused) -> { + final Runnable onGroupFailure; + final CancellableTask groupTask; + if (allowPartialResults) { + groupTask = RemoteListenerGroup.createGroupTask( + transportService, + parentTask, + () -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]" + ); + onGroupFailure = computeService.cancelQueryOnFailure(groupTask); + l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); + } else { + groupTask = parentTask; + onGroupFailure = runOnTaskFailure; + } final AtomicReference nodeResponseRef = new AtomicReference<>(); try ( - var computeListener = new ComputeListener(threadPool, runOnTaskFailure, l.map(ignored -> nodeResponseRef.get())) + var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get())) ) { - final var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, connection); + final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection); exchangeSource.addRemoteSink( remoteSink, - true, + allowPartialResults == false, pagesFetched::incrementAndGet, queryPragmas.concurrentExchangeClients(), computeListener.acquireAvoid() @@ -153,7 +168,7 @@ protected void sendRequest( connection, ComputeService.DATA_ACTION_NAME, dataNodeRequest, - parentTask, + groupTask, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(computeListener.acquireCompute().map(r -> { nodeResponseRef.set(r); @@ -238,6 +253,7 @@ public void onFailure(Exception e) { } onResponse(List.of()); } else { + // TODO: add these to fatal failures so we can continue processing other shards. try { exchangeService.finishSinkHandler(request.sessionId(), e); } finally { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 2d5b4169c0215..a5b8c13f9a730 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -55,6 +55,7 @@ abstract class DataNodeRequestSender { private final TransportService transportService; private final Executor esqlExecutor; private final CancellableTask rootTask; + private final boolean allowPartialResults; private final ReentrantLock sendingLock = new ReentrantLock(); private final Queue pendingShardIds = ConcurrentCollections.newQueue(); private final Map nodePermits = new HashMap<>(); @@ -62,10 +63,11 @@ abstract class DataNodeRequestSender { private final AtomicBoolean changed = new AtomicBoolean(); private boolean reportedFailure = false; // guarded by sendingLock - DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask) { + DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask, boolean allowPartialResults) { this.transportService = transportService; this.esqlExecutor = esqlExecutor; this.rootTask = rootTask; + this.allowPartialResults = allowPartialResults; } final void startComputeOnDataNodes( @@ -80,13 +82,14 @@ final void startComputeOnDataNodes( searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> { try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> { TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos); + final int failedShards = shardFailures.size(); return new ComputeResponse( profiles, took, targetShards.totalShards(), - targetShards.totalShards(), + targetShards.totalShards() - failedShards, targetShards.skippedShards(), - 0 + failedShards ); }))) { for (TargetShard shard : targetShards.shards.values()) { @@ -120,7 +123,8 @@ private void trySendingRequestsForPendingShards(TargetShards targetShards, Compu ); } } - if (reportedFailure || shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) { + if (reportedFailure + || (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) { reportedFailure = true; reportFailures(computeListener); } else { @@ -345,7 +349,7 @@ void searchShards( filter, null, null, - false, + true, // unavailable_shards will be handled by the sender clusterAlias ); transportService.sendChildRequest( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java index 54ab269bb91ae..3c6c13993520b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java @@ -50,7 +50,7 @@ class RemoteListenerGroup { this.taskManager = transportService.getTaskManager(); this.clusterAlias = clusterAlias; this.executionInfo = executionInfo; - groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); + groupTask = createGroupTask(transportService, rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); CountDown countDown = new CountDown(2); // The group is done when both the sink and the cluster request are done Runnable finishGroup = () -> { @@ -92,7 +92,8 @@ public ActionListener> getClusterRequestListener() { return clusterRequestListener; } - private CancellableTask createGroupTask(Task parentTask, Supplier description) { + public static CancellableTask createGroupTask(TransportService transportService, Task parentTask, Supplier description) { + final TaskManager taskManager = transportService.getTaskManager(); return (CancellableTask) taskManager.register( "transport", "esql_compute_group", diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 5b0dfa14014a6..baf351c27107c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -206,7 +206,8 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { - // If we had any skipped or partial clusters, the result is partial - if (executionInfo.getClusters() - .values() - .stream() - .anyMatch( - c -> c.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED - || c.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL - )) { - executionInfo.markAsPartial(); - } recordCCSTelemetry(task, executionInfo, request, null); listener.onResponse(toResponse(task, request, configuration, result)); }, ex -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java index 059ad175468a7..b08eabda21cfc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java @@ -50,6 +50,7 @@ public class Configuration implements Writeable { private final String query; private final boolean profile; + private final boolean allowPartialResults; private final Map> tables; private final long queryStartTimeNanos; @@ -65,7 +66,8 @@ public Configuration( String query, boolean profile, Map> tables, - long queryStartTimeNanos + long queryStartTimeNanos, + boolean allowPartialResults ) { this.zoneId = zi.normalized(); this.now = ZonedDateTime.now(Clock.tick(Clock.system(zoneId), Duration.ofNanos(1))); @@ -80,6 +82,7 @@ public Configuration( this.tables = tables; assert tables != null; this.queryStartTimeNanos = queryStartTimeNanos; + this.allowPartialResults = allowPartialResults; } public Configuration(BlockStreamInput in) throws IOException { @@ -107,6 +110,11 @@ public Configuration(BlockStreamInput in) throws IOException { } else { this.queryStartTimeNanos = -1; } + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS)) { + this.allowPartialResults = in.readBoolean(); + } else { + this.allowPartialResults = false; + } } @Override @@ -131,6 +139,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeLong(queryStartTimeNanos); } + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS)) { + out.writeBoolean(allowPartialResults); + } } public ZoneId zoneId() { @@ -206,6 +217,13 @@ public boolean profile() { return profile; } + /** + * Whether this request can return partial results instead of failing fast on failures + */ + public boolean allowPartialResults() { + return allowPartialResults; + } + private static void writeQuery(StreamOutput out, String query) throws IOException { if (query.length() > QUERY_COMPRESS_THRESHOLD_CHARS) { // compare on chars to avoid UTF-8 encoding unless actually required out.writeBoolean(true); @@ -244,7 +262,8 @@ public boolean equals(Object o) { && Objects.equals(locale, that.locale) && Objects.equals(that.query, query) && profile == that.profile - && tables.equals(that.tables); + && tables.equals(that.tables) + && allowPartialResults == that.allowPartialResults; } @Override @@ -260,7 +279,8 @@ public int hashCode() { locale, query, profile, - tables + tables, + allowPartialResults ); } @@ -282,6 +302,9 @@ public String toString() { + profile + ", tables=" + tables + + "allow_partial_result=" + + allowPartialResults + '}'; } + } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java index a3a18d7a30b59..8cc9091dae90d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java @@ -43,7 +43,8 @@ static Configuration randomConfiguration() { StringUtils.EMPTY, randomBoolean(), Map.of(), - System.nanoTime() + System.nanoTime(), + randomBoolean() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java index f779dd038454d..a846ab7cb4aa7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java @@ -70,7 +70,8 @@ private Configuration randomLocaleConfig() { "", false, Map.of(), - System.nanoTime() + System.nanoTime(), + randomBoolean() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java index 3957c2e1fb2c0..da963f76b8dd6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java @@ -70,7 +70,8 @@ private Configuration randomLocaleConfig() { "", false, Map.of(), - System.nanoTime() + System.nanoTime(), + randomBoolean() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java index 6dc1eac2e5814..3ea56d6d15abd 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java @@ -78,7 +78,8 @@ public class EvalMapperTests extends ESTestCase { StringUtils.EMPTY, false, Map.of(), - System.nanoTime() + System.nanoTime(), + false ); @ParametersFactory(argumentFormatting = "%1$s") diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 7e5143d5a3ac0..d753045bee7cf 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -202,7 +202,8 @@ private Configuration config() { StringUtils.EMPTY, false, Map.of(), - System.nanoTime() + System.nanoTime(), + randomBoolean() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java index e181d9bb34955..e34deb4d55f20 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java @@ -52,6 +52,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -88,7 +90,11 @@ public void shutdownThreadPool() throws Exception { } public void testEmpty() { - var future = sendRequests(List.of(), (node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent")); + var future = sendRequests( + List.of(), + randomBoolean(), + (node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent") + ); var resp = safeGet(future); assertThat(resp.totalShards, equalTo(0)); } @@ -101,7 +107,7 @@ public void testOnePass() { targetShard(shard4, node2, node3) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); var resp = new DataNodeComputeResponse(List.of(), Map.of()); runWithDelay(() -> listener.onResponse(resp)); @@ -112,12 +118,25 @@ public void testOnePass() { } public void testMissingShards() { - var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { - fail("expect no data-node request is sent when target shards are missing"); - }); - var error = expectThrows(NoShardAvailableActionException.class, future::actionGet); - assertThat(error.getMessage(), containsString("no shard copies found")); + { + var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); + var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { + fail("expect no data-node request is sent when target shards are missing"); + }); + var error = expectThrows(NoShardAvailableActionException.class, future::actionGet); + assertThat(error.getMessage(), containsString("no shard copies found")); + } + { + var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3)); + var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> { + assertThat(shard3, not(in(shardIds))); + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + }); + ComputeResponse resp = safeGet(future); + assertThat(resp.totalShards, equalTo(3)); + assertThat(resp.failedShards, equalTo(1)); + assertThat(resp.successfulShards, equalTo(2)); + } } public void testRetryThenSuccess() { @@ -129,7 +148,7 @@ public void testRetryThenSuccess() { targetShard(shard5, node1, node3, node2) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); Map failures = new HashMap<>(); if (node.equals(node1) && shardIds.contains(shard5)) { @@ -161,7 +180,7 @@ public void testRetryButFail() { targetShard(shard5, node1, node3, node2) ); Queue sent = ConcurrentCollections.newQueue(); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); Map failures = new HashMap<>(); if (shardIds.contains(shard5)) { @@ -187,7 +206,7 @@ public void testDoNotRetryOnRequestLevelFailure() { var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1)); Queue sent = ConcurrentCollections.newQueue(); AtomicBoolean failed = new AtomicBoolean(); - var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> { + var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> { sent.add(new NodeRequest(node, shardIds, aliasFilters)); if (node1.equals(node) && failed.compareAndSet(false, true)) { runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); @@ -203,6 +222,28 @@ public void testDoNotRetryOnRequestLevelFailure() { assertThat(firstRound, equalTo(Map.of(node1, List.of(shard1, shard3), node2, List.of(shard2)))); } + public void testAllowPartialResults() { + var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1, node2)); + Queue sent = ConcurrentCollections.newQueue(); + AtomicBoolean failed = new AtomicBoolean(); + var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> { + sent.add(new NodeRequest(node, shardIds, aliasFilters)); + if (node1.equals(node) && failed.compareAndSet(false, true)) { + runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true)); + } else { + runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of()))); + } + }); + ComputeResponse resp = safeGet(future); + // one round: {node-1, node-2} + assertThat(sent.size(), equalTo(2)); + var firstRound = groupRequests(sent, 2); + assertThat(firstRound, equalTo(Map.of(node1, List.of(shard1, shard3), node2, List.of(shard2)))); + assertThat(resp.totalShards, equalTo(3)); + assertThat(resp.failedShards, equalTo(2)); + assertThat(resp.successfulShards, equalTo(1)); + } + static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) { return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null); } @@ -224,7 +265,11 @@ void runWithDelay(Runnable runnable) { } } - PlainActionFuture sendRequests(List shards, Sender sender) { + PlainActionFuture sendRequests( + List shards, + boolean allowPartialResults, + Sender sender + ) { PlainActionFuture future = new PlainActionFuture<>(); TransportService transportService = mock(TransportService.class); when(transportService.getThreadPool()).thenReturn(threadPool); @@ -236,7 +281,7 @@ PlainActionFuture sendRequests(List