-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Support partial results in CCS in ES|QL #122708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
b88b208
c6159ad
1252fba
6c4ec5a
261ed18
badef87
88ded0e
5daefe7
039c8fe
9283823
d3be47f
0b89a24
9e47401
e00080e
7624a8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 122708 | ||
| summary: Support partial results in CCS in ES|QL | ||
| area: ES|QL | ||
| type: enhancement | ||
| issues: [] |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| import org.elasticsearch.action.ActionListenerResponseHandler; | ||
| import org.elasticsearch.action.OriginalIndices; | ||
| import org.elasticsearch.action.support.ChannelActionListener; | ||
| import org.elasticsearch.compute.operator.DriverProfile; | ||
| import org.elasticsearch.compute.operator.exchange.ExchangeService; | ||
| import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; | ||
| import org.elasticsearch.core.Releasable; | ||
|
|
@@ -34,10 +35,11 @@ | |
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.concurrent.Executor; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.function.Function; | ||
|
|
||
| /** | ||
| * Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes. | ||
|
|
@@ -74,69 +76,107 @@ void startComputeOnRemoteCluster( | |
| RemoteCluster cluster, | ||
| Runnable cancelQueryOnFailure, | ||
| EsqlExecutionInfo executionInfo, | ||
| ActionListener<ComputeResponse> listener | ||
| ActionListener<List<DriverProfile>> listener | ||
| ) { | ||
| var queryPragmas = configuration.pragmas(); | ||
| listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); | ||
| final var childSessionId = computeService.newChildSession(sessionId); | ||
| final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>(); | ||
| final String clusterAlias = cluster.clusterAlias(); | ||
| try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { | ||
| var resp = finalResponse.get(); | ||
| return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles)); | ||
| }))) { | ||
| ExchangeService.openExchange( | ||
| transportService, | ||
| cluster.connection, | ||
| childSessionId, | ||
| queryPragmas.exchangeBufferSize(), | ||
| esqlExecutor, | ||
| EsqlCCSUtils.skipUnavailableListener( | ||
| computeListener.acquireAvoid(), | ||
| executionInfo, | ||
| clusterAlias, | ||
| EsqlExecutionInfo.Cluster.Status.SKIPPED | ||
| ).delegateFailureAndWrap((l, unused) -> { | ||
| var listenerGroup = new RemoteListenerGroup( | ||
| transportService, | ||
| rootTask, | ||
| computeListener, | ||
| clusterAlias, | ||
| executionInfo, | ||
| l | ||
| ); | ||
|
|
||
| var remoteSink = exchangeService.newRemoteSink( | ||
| listenerGroup.getGroupTask(), | ||
| childSessionId, | ||
| transportService, | ||
| cluster.connection | ||
| ); | ||
| final AtomicInteger pagesFetched = new AtomicInteger(); | ||
| final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>(); | ||
| listener = listener.delegateResponse((l, e) -> { | ||
| final boolean receivedResults = finalResponse.get() != null || pagesFetched.get() > 0; | ||
| if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { | ||
| EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); | ||
| l.onResponse(List.of()); | ||
| } else if (configuration.allowPartialResults()) { | ||
| EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); | ||
| l.onResponse(List.of()); | ||
| } else { | ||
| l.onFailure(e); | ||
| } | ||
| }); | ||
| ExchangeService.openExchange( | ||
| transportService, | ||
| cluster.connection, | ||
| childSessionId, | ||
| queryPragmas.exchangeBufferSize(), | ||
| esqlExecutor, | ||
| listener.delegateFailure((l, unused) -> { | ||
| final CancellableTask groupTask; | ||
| final Runnable onGroupFailure; | ||
| boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false; | ||
| if (failFast) { | ||
| groupTask = rootTask; | ||
| onGroupFailure = cancelQueryOnFailure; | ||
| } else { | ||
| groupTask = computeService.createGroupTask(rootTask); | ||
| onGroupFailure = computeService.cancelQueryOnFailure(groupTask); | ||
| l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask)); | ||
| } | ||
| try (var computeListener = new ComputeListener(transportService.getThreadPool(), onGroupFailure, l.map(profiles -> { | ||
| updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get()); | ||
| return profiles; | ||
| }))) { | ||
| var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); | ||
| exchangeSource.addRemoteSink( | ||
| remoteSink, | ||
| executionInfo.isSkipUnavailable(clusterAlias) == false, | ||
| () -> {}, | ||
| failFast, | ||
| pagesFetched::incrementAndGet, | ||
|
||
| queryPragmas.concurrentExchangeClients(), | ||
| listenerGroup.getExchangeRequestListener() | ||
| computeListener.acquireAvoid() | ||
| ); | ||
| var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); | ||
| var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan); | ||
| final ActionListener<ComputeResponse> clusterListener = listenerGroup.getClusterRequestListener().map(r -> { | ||
| final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> { | ||
| finalResponse.set(r); | ||
| return r.getProfiles(); | ||
| }); | ||
| transportService.sendChildRequest( | ||
| cluster.connection, | ||
| ComputeService.CLUSTER_ACTION_NAME, | ||
| clusterRequest, | ||
| listenerGroup.getGroupTask(), | ||
| groupTask, | ||
| TransportRequestOptions.EMPTY, | ||
| new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) | ||
| ); | ||
| }) | ||
| } | ||
| }) | ||
| ); | ||
| } | ||
|
|
||
| private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { | ||
| Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> { | ||
| if (status != EsqlExecutionInfo.Cluster.Status.RUNNING) { | ||
| return status; | ||
| } else if (executionInfo.isStopped() || resp.failedShards > 0) { | ||
| return EsqlExecutionInfo.Cluster.Status.PARTIAL; | ||
| } else { | ||
| return EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; | ||
| } | ||
| }; | ||
| if (resp.getTook() != null) { | ||
| var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()); | ||
| executionInfo.swapCluster( | ||
| clusterAlias, | ||
| (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) | ||
| .setTook(tookTime) | ||
| .setTotalShards(resp.getTotalShards()) | ||
| .setSuccessfulShards(resp.getSuccessfulShards()) | ||
| .setSkippedShards(resp.getSkippedShards()) | ||
| .setFailedShards(resp.getFailedShards()) | ||
| .build() | ||
| ); | ||
| } else { | ||
| // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator | ||
| // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response | ||
| executionInfo.swapCluster( | ||
| clusterAlias, | ||
| (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) | ||
| .setTook(executionInfo.tookSoFar()) | ||
smalyshev marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .build() | ||
| ); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| List<RemoteCluster> getRemoteClusters( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,8 +31,12 @@ | |
| import org.elasticsearch.search.internal.SearchContext; | ||
| import org.elasticsearch.search.lookup.SourceProvider; | ||
| import org.elasticsearch.tasks.CancellableTask; | ||
| import org.elasticsearch.tasks.Task; | ||
| import org.elasticsearch.tasks.TaskId; | ||
| import org.elasticsearch.tasks.TaskManager; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.RemoteClusterAware; | ||
| import org.elasticsearch.transport.TransportRequest; | ||
| import org.elasticsearch.transport.TransportService; | ||
| import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; | ||
| import org.elasticsearch.xpack.esql.action.EsqlQueryAction; | ||
|
|
@@ -57,7 +61,6 @@ | |
| import java.util.Set; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; | ||
|
|
@@ -211,7 +214,8 @@ public void execute( | |
| computeListener.acquireCompute().delegateFailure((l, profiles) -> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, here I am not 100% sure of the intended logic - for a remote, it looks like if
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the search_shards can fail when a node shuts down, and I've pushed d3be47f to handle this case. We also need to handle partial results for the final pipeline, but that's for a follow-up. |
||
| if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) { | ||
| var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos()); | ||
| var status = localClusterWasInterrupted.get() | ||
| final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards(); | ||
| var status = localClusterWasInterrupted.get() || (failedShards != null && failedShards > 0) | ||
| ? EsqlExecutionInfo.Cluster.Status.PARTIAL | ||
| : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; | ||
| execInfo.swapCluster( | ||
|
|
@@ -277,48 +281,13 @@ public void execute( | |
| cluster, | ||
| cancelQueryOnFailure, | ||
| execInfo, | ||
| computeListener.acquireCompute().map(r -> { | ||
| updateExecutionInfo(execInfo, cluster.clusterAlias(), r); | ||
| return r.getProfiles(); | ||
| }) | ||
| computeListener.acquireCompute() | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { | ||
| Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> { | ||
| if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { | ||
| return executionInfo.isStopped() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; | ||
| } else { | ||
| return status; | ||
| } | ||
| }; | ||
| if (resp.getTook() != null) { | ||
| var tookTime = TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()); | ||
| executionInfo.swapCluster( | ||
| clusterAlias, | ||
| (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) | ||
| .setTook(tookTime) | ||
| .setTotalShards(resp.getTotalShards()) | ||
| .setSuccessfulShards(resp.getSuccessfulShards()) | ||
| .setSkippedShards(resp.getSkippedShards()) | ||
| .setFailedShards(resp.getFailedShards()) | ||
| .build() | ||
| ); | ||
| } else { | ||
| // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator | ||
| // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response | ||
| executionInfo.swapCluster( | ||
| clusterAlias, | ||
| (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) | ||
| .setTook(executionInfo.tookSoFar()) | ||
| .build() | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries) | ||
| private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) { | ||
| if (execInfo.isCrossClusterSearch()) { | ||
|
|
@@ -444,4 +413,36 @@ Runnable cancelQueryOnFailure(CancellableTask task) { | |
| transportService.getTaskManager().cancelTaskAndDescendants(task, "cancelled on failure", false, ActionListener.noop()); | ||
| }); | ||
| } | ||
|
|
||
| CancellableTask createGroupTask(Task parentTask) { | ||
| final TaskManager taskManager = transportService.getTaskManager(); | ||
| return (CancellableTask) taskManager.register( | ||
| "transport", | ||
| "esql_compute_group", | ||
| new ComputeGroupTaskRequest( | ||
| parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), | ||
| parentTask::getDescription | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| private static class ComputeGroupTaskRequest extends TransportRequest { | ||
| private final Supplier<String> parentDescription; | ||
|
|
||
| ComputeGroupTaskRequest(TaskId parentTask, Supplier<String> description) { | ||
| this.parentDescription = description; | ||
| setParentTask(parentTask); | ||
| } | ||
|
|
||
| @Override | ||
| public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) { | ||
| assert parentTaskId.isSet(); | ||
| return new CancellableTask(id, type, action, "", parentTaskId, headers); | ||
| } | ||
|
|
||
| @Override | ||
| public String getDescription() { | ||
| return "group [" + parentDescription.get() + "]"; | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.