Skip to content

Commit a8662d8

Browse files
committed
Implement runtime skip_unavailable=true
1 parent 0d2db06 commit a8662d8

File tree

4 files changed

+212
-22
lines changed

4 files changed

+212
-22
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.transport.TransportRequestHandler;
2626
import org.elasticsearch.transport.TransportRequestOptions;
2727
import org.elasticsearch.transport.TransportService;
28+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
2829
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
2930
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
3031
import org.elasticsearch.xpack.esql.session.Configuration;
@@ -37,6 +38,8 @@
3738
import java.util.concurrent.Executor;
3839
import java.util.concurrent.atomic.AtomicReference;
3940

41+
import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards;
42+
4043
/**
4144
* Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes.
4245
* This handler delegates the execution of computes on data nodes within each remote cluster to {@link DataNodeComputeHandler}.
@@ -71,34 +74,51 @@ void startComputeOnRemoteCluster(
7174
ExchangeSourceHandler exchangeSource,
7275
RemoteCluster cluster,
7376
Runnable cancelQueryOnFailure,
77+
EsqlExecutionInfo executionInfo,
7478
ActionListener<ComputeResponse> listener
7579
) {
7680
var queryPragmas = configuration.pragmas();
7781
listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close);
7882
final var childSessionId = computeService.newChildSession(sessionId);
7983
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
84+
final String clusterAlias = cluster.clusterAlias();
8085
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
8186
var resp = finalResponse.get();
8287
return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles));
8388
}))) {
89+
var openExchangeListener = computeListener.acquireAvoid();
8490
ExchangeService.openExchange(
8591
transportService,
8692
cluster.connection,
8793
childSessionId,
8894
queryPragmas.exchangeBufferSize(),
8995
esqlExecutor,
90-
computeListener.acquireCompute().delegateFailureAndWrap((l, unused) -> {
91-
var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection);
96+
ActionListener.wrap(unused -> {
97+
var listenerGroup = new RemoteListenerGroup(
98+
transportService,
99+
rootTask,
100+
computeListener,
101+
clusterAlias,
102+
executionInfo,
103+
openExchangeListener
104+
);
105+
106+
var remoteSink = exchangeService.newRemoteSink(
107+
listenerGroup.getGroupTask(),
108+
childSessionId,
109+
transportService,
110+
cluster.connection
111+
);
92112
exchangeSource.addRemoteSink(
93113
remoteSink,
94-
true,
114+
executionInfo.isSkipUnavailable(clusterAlias) == false,
95115
() -> {},
96116
queryPragmas.concurrentExchangeClients(),
97-
computeListener.acquireAvoid()
117+
listenerGroup.getExchangeRequestListener()
98118
);
99119
var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
100-
var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan);
101-
final ActionListener<ComputeResponse> clusterListener = l.map(r -> {
120+
var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
121+
final ActionListener<ComputeResponse> clusterListener = listenerGroup.getClusterRequestListener().map(r -> {
102122
finalResponse.set(r);
103123
return r.getProfiles();
104124
});
@@ -110,9 +130,17 @@ void startComputeOnRemoteCluster(
110130
TransportRequestOptions.EMPTY,
111131
new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
112132
);
133+
}, e -> {
134+
if (executionInfo.isSkipUnavailable(clusterAlias)) {
135+
markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
136+
openExchangeListener.onResponse(null);
137+
} else {
138+
openExchangeListener.onFailure(e);
139+
}
113140
})
114141
);
115142
}
143+
116144
}
117145

118146
List<RemoteCluster> getRemoteClusters(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.function.Supplier;
6262

6363
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
64+
import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards;
6465

6566
/**
6667
* Computes the result of a {@link PhysicalPlan}.
@@ -268,6 +269,8 @@ public void execute(
268269
// starts computes on remote clusters
269270
final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices);
270271
for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) {
272+
var remoteListener = computeListener.acquireCompute();
273+
String clusterAlias = cluster.clusterAlias();
271274
clusterComputeHandler.startComputeOnRemoteCluster(
272275
sessionId,
273276
rootTask,
@@ -276,16 +279,28 @@ public void execute(
276279
exchangeSource,
277280
cluster,
278281
cancelQueryOnFailure,
279-
computeListener.acquireCompute().map(r -> {
280-
updateExecutionInfo(execInfo, cluster.clusterAlias(), r);
281-
return r.getProfiles();
282+
execInfo,
283+
ActionListener.wrap((ComputeResponse r) -> {
284+
updateExecutionInfo(execInfo, clusterAlias, r);
285+
remoteListener.onResponse(r.getProfiles());
286+
}, e -> {
287+
if (shouldIgnoreRemoteError(clusterAlias, e)) {
288+
markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
289+
remoteListener.onResponse(Collections.emptyList());
290+
} else {
291+
remoteListener.onFailure(e);
292+
}
282293
})
283294
);
284295
}
285296
}
286297
}
287298
}
288299

300+
private boolean shouldIgnoreRemoteError(String clusterAlias, Exception e) {
301+
return true;
302+
}
303+
289304
private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
290305
Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> {
291306
if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) {
@@ -309,11 +324,10 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
309324
} else {
310325
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
311326
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
312-
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - executionInfo.getRelativeStartNanos());
313327
executionInfo.swapCluster(
314328
clusterAlias,
315329
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus()))
316-
.setTook(tookTime)
330+
.setTook(executionInfo.tookSoFar())
317331
.build()
318332
);
319333
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.common.util.concurrent.CountDown;
12+
import org.elasticsearch.compute.operator.DriverProfile;
13+
import org.elasticsearch.tasks.CancellableTask;
14+
import org.elasticsearch.tasks.Task;
15+
import org.elasticsearch.tasks.TaskId;
16+
import org.elasticsearch.tasks.TaskManager;
17+
import org.elasticsearch.transport.TransportRequest;
18+
import org.elasticsearch.transport.TransportService;
19+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.function.Supplier;
24+
25+
import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards;
26+
27+
// Create group task for this cluster. This group task ensures that two branches of the computation:
28+
// the exchange sink and the cluster request, belong to the same group and each of them can cancel the other.
29+
// runAfter listeners below ensure that the group is finalized when both branches are done.
30+
// The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too.
31+
class RemoteListenerGroup {
32+
private final CancellableTask groupTask;
33+
private final ActionListener<Void> exchangeRequestListener;
34+
private final ActionListener<List<DriverProfile>> clusterRequestListener;
35+
private final TaskManager taskManager;
36+
private final String clusterAlias;
37+
private final EsqlExecutionInfo executionInfo;
38+
private final TransportService transportService;
39+
40+
RemoteListenerGroup(
41+
TransportService transportService,
42+
Task rootTask,
43+
ComputeListener computeListener,
44+
String clusterAlias,
45+
EsqlExecutionInfo executionInfo,
46+
ActionListener<Void> delegate
47+
) {
48+
this.transportService = transportService;
49+
this.taskManager = transportService.getTaskManager();
50+
this.clusterAlias = clusterAlias;
51+
this.executionInfo = executionInfo;
52+
groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]");
53+
CountDown countDown = new CountDown(2);
54+
// The group is done when both the sink and the cluster request are done
55+
Runnable finishGroup = () -> {
56+
if (countDown.countDown()) {
57+
taskManager.unregister(groupTask);
58+
delegate.onResponse(null);
59+
}
60+
};
61+
// Cancel the group on sink failure
62+
exchangeRequestListener = createCancellingListener("exchange sink failure", computeListener.acquireAvoid(), finishGroup);
63+
64+
// Cancel the group on cluster request failure
65+
clusterRequestListener = createCancellingListener("exchange cluster action failure", computeListener.acquireCompute(), finishGroup);
66+
}
67+
68+
/**
69+
* Create a listener that:
70+
* 1. Cancels the group task on failure
71+
* 2. Marks the cluster as partial if the error is ignorable, otherwise propagates the error
72+
*/
73+
private <T> ActionListener<T> createCancellingListener(String reason, ActionListener<T> delegate, Runnable finishGroup) {
74+
return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> {
75+
taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> {
76+
if (shouldIgnoreRemoteError(clusterAlias, e)) {
77+
markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
78+
delegate.onResponse(null);
79+
} else {
80+
delegate.onFailure(e);
81+
}
82+
}));
83+
}), finishGroup);
84+
}
85+
86+
private boolean shouldIgnoreRemoteError(String clusterAlias, Exception e) {
87+
return executionInfo.isSkipUnavailable(clusterAlias);
88+
}
89+
90+
public CancellableTask getGroupTask() {
91+
return groupTask;
92+
}
93+
94+
public ActionListener<Void> getExchangeRequestListener() {
95+
return exchangeRequestListener;
96+
}
97+
98+
public ActionListener<List<DriverProfile>> getClusterRequestListener() {
99+
return clusterRequestListener;
100+
}
101+
102+
private CancellableTask createGroupTask(Task parentTask, Supplier<String> description) {
103+
return (CancellableTask) taskManager.register(
104+
"transport",
105+
"esql_compute_group",
106+
new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description)
107+
);
108+
}
109+
110+
private static class ComputeGroupTaskRequest extends TransportRequest {
111+
private final Supplier<String> parentDescription;
112+
113+
ComputeGroupTaskRequest(TaskId parentTask, Supplier<String> description) {
114+
this.parentDescription = description;
115+
setParentTask(parentTask);
116+
}
117+
118+
@Override
119+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
120+
assert parentTaskId.isSet();
121+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
122+
}
123+
124+
@Override
125+
public String getDescription() {
126+
return "group [" + parentDescription.get() + "]";
127+
}
128+
}
129+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.support.IndicesOptions;
1616
import org.elasticsearch.common.Strings;
1717
import org.elasticsearch.common.util.set.Sets;
18+
import org.elasticsearch.core.Nullable;
1819
import org.elasticsearch.core.TimeValue;
1920
import org.elasticsearch.indices.IndicesExpressionGrouper;
2021
import org.elasticsearch.license.XPackLicenseState;
@@ -25,6 +26,7 @@
2526
import org.elasticsearch.transport.RemoteTransportException;
2627
import org.elasticsearch.xpack.esql.VerificationException;
2728
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
29+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster;
2830
import org.elasticsearch.xpack.esql.analysis.Analyzer;
2931
import org.elasticsearch.xpack.esql.analysis.TableInfo;
3032
import org.elasticsearch.xpack.esql.index.IndexResolution;
@@ -37,7 +39,7 @@
3739
import java.util.Map;
3840
import java.util.Set;
3941

40-
class EsqlSessionCCSUtils {
42+
public class EsqlSessionCCSUtils {
4143

4244
private EsqlSessionCCSUtils() {}
4345

@@ -171,16 +173,7 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf
171173
entry.getValue().getException()
172174
);
173175
if (skipUnavailable) {
174-
execInfo.swapCluster(
175-
clusterAlias,
176-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED)
177-
.setTotalShards(0)
178-
.setSuccessfulShards(0)
179-
.setSkippedShards(0)
180-
.setFailedShards(0)
181-
.setFailures(List.of(new ShardSearchFailure(e)))
182-
.build()
183-
);
176+
markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
184177
} else {
185178
throw e;
186179
}
@@ -338,4 +331,30 @@ public static void checkForCcsLicense(
338331
}
339332
}
340333
}
334+
335+
/**
336+
* Mark cluster with a default cluster state with the given status and potentially failure from exception.
337+
* Most metrics are set to 0 except for "took" which is set to the total time taken so far.
338+
* The status must be the final state of the cluster, not RUNNING.
339+
*/
340+
public static void markClusterWithFinalStateAndNoShards(
341+
EsqlExecutionInfo executionInfo,
342+
String clusterAlias,
343+
Cluster.Status status,
344+
@Nullable Exception ex
345+
) {
346+
assert status != Cluster.Status.RUNNING : "status must be a final state, not RUNNING";
347+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
348+
Cluster.Builder builder = new Cluster.Builder(v).setStatus(status)
349+
.setTook(executionInfo.tookSoFar())
350+
.setTotalShards(0)
351+
.setSuccessfulShards(0)
352+
.setSkippedShards(0)
353+
.setFailedShards(0);
354+
if (ex != null) {
355+
builder.setFailures(List.of(new ShardSearchFailure(ex)));
356+
}
357+
return builder.build();
358+
});
359+
}
341360
}

0 commit comments

Comments
 (0)