Skip to content

Commit 514e22d

Browse files
committed
Ensure remote pipeline early termination
1 parent b53e1f1 commit 514e22d

File tree

9 files changed

+70
-19
lines changed

9 files changed

+70
-19
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ static TransportVersion def(int id) {
142142
public static final TransportVersion SOURCE_MODE_TELEMETRY = def(8_802_00_0);
143143
public static final TransportVersion NEW_REFRESH_CLUSTER_BLOCK = def(8_803_00_0);
144144
public static final TransportVersion RETRIES_AND_OPERATIONS_IN_BLOBSTORE_STATS = def(8_804_00_0);
145+
public static final TransportVersion COMPUTE_RESPONSE_PARTIAL = def(8_805_00_0);
145146

146147
/*
147148
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.action.support.ContextPreservingActionListener;
1214
import org.elasticsearch.action.support.SubscribableListener;
@@ -174,6 +176,8 @@ public DriverContext driverContext() {
174176
return driverContext;
175177
}
176178

179+
private static final Logger LOGGER = LogManager.getLogger(Driver.class);
180+
177181
/**
178182
* Runs computations on the chain of operators for a given maximum amount of time or iterations.
179183
* Returns a blocked future when the chain of operators is blocked, allowing the caller
@@ -239,6 +243,7 @@ private void checkForEarlyTermination() throws DriverEarlyTerminationException {
239243
for (int i = activeOperators.size() - 2; i >= 0; i--) {
240244
Operator op = activeOperators.get(i);
241245
if (op.isFinished() == false) {
246+
LOGGER.debug("Early terminated!");
242247
throw new DriverEarlyTerminationException();
243248
}
244249
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator.exchange;
99

10+
import org.apache.lucene.util.SetOnce;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.support.SubscribableListener;
1213
import org.elasticsearch.compute.data.BlockFactory;
@@ -41,6 +42,7 @@ public final class ExchangeSinkHandler {
4142
private final LongSupplier nowInMillis;
4243
private final AtomicLong lastUpdatedInMillis;
4344
private final BlockFactory blockFactory;
45+
private final SetOnce<ExchangeSourceHandler> source = new SetOnce<>();
4446

4547
public ExchangeSinkHandler(BlockFactory blockFactory, int maxBufferSize, LongSupplier nowInMillis) {
4648
this.blockFactory = blockFactory;
@@ -98,6 +100,10 @@ public IsBlockedResult waitForWriting() {
98100
public void fetchPageAsync(boolean sourceFinished, ActionListener<ExchangeResponse> listener) {
99101
if (sourceFinished) {
100102
buffer.finish(true);
103+
var subSource = source.get();
104+
if (subSource != null) {
105+
subSource.finishEarly(true, ActionListener.noop());
106+
}
101107
}
102108
listeners.add(listener);
103109
onChanged();
@@ -150,6 +156,10 @@ private void notifyListeners() {
150156
}
151157
}
152158

159+
public void setSource(ExchangeSourceHandler sub) {
160+
source.set(sub);
161+
}
162+
153163
/**
154164
* Create a new exchange sink for exchanging data
155165
*

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
8585
}
8686

8787
public void onFinishEarly(Runnable finishEarlyHandler) {
88+
// TODO: not sure this is the best way but we need to know when the exchange source is finished early to set exec info
8889
this.finishEarlyHandler = finishEarlyHandler;
8990
}
9091

@@ -320,6 +321,10 @@ public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
320321
finishEarlyHandler.run();
321322
}
322323
buffer.finish(drainingPages);
324+
if (remoteSinks.isEmpty()) {
325+
listener.onResponse(null);
326+
return;
327+
}
323328
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(listener)) {
324329
for (RemoteSink remoteSink : remoteSinks.values()) {
325330
remoteSink.close(refs.acquire());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.core.Predicates;
1919
import org.elasticsearch.core.TimeValue;
2020
import org.elasticsearch.rest.action.RestActions;
21+
import org.elasticsearch.transport.NoSuchRemoteClusterException;
2122
import org.elasticsearch.transport.RemoteClusterAware;
2223
import org.elasticsearch.transport.RemoteClusterService;
2324
import org.elasticsearch.xcontent.ParseField;
@@ -189,7 +190,7 @@ public Set<String> clusterAliases() {
189190
/**
190191
* @param clusterAlias to lookup skip_unavailable from
191192
* @return skip_unavailable setting (true/false)
192-
* @throws org.elasticsearch.transport.NoSuchRemoteClusterException if clusterAlias is unknown to this node's RemoteClusterService
193+
* @throws NoSuchRemoteClusterException if clusterAlias is unknown to this node's RemoteClusterService
193194
*/
194195
public boolean isSkipUnavailable(String clusterAlias) {
195196
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
@@ -279,10 +280,20 @@ public boolean isPartial() {
279280
return isPartial;
280281
}
281282

282-
public void setPartial() {
283+
/**
284+
* Mark the query as having partial results.
285+
*/
286+
public void markAsPartial() {
283287
isPartial = true;
284288
}
285289

290+
/**
291+
* Mark this cluster as having partial results.
292+
*/
293+
public void markClusterAsPartial(String clusterAlias) {
294+
swapCluster(clusterAlias, (k, v) -> new Cluster.Builder(v).setStatus(Cluster.Status.PARTIAL).build());
295+
}
296+
286297
/**
287298
* Represents the search metadata about a particular cluster involved in a cross-cluster search.
288299
* The Cluster object can represent either the local cluster or a remote cluster.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ private ComputeListener(
119119
cluster.getTotalShards(),
120120
cluster.getSuccessfulShards(),
121121
cluster.getSkippedShards(),
122-
cluster.getFailedShards()
122+
cluster.getFailedShards(),
123+
cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL
123124
);
124125
} else {
125126
result = new ComputeResponse(collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList());
@@ -135,10 +136,14 @@ private ComputeListener(
135136

136137
private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) {
137138
executionInfo.swapCluster(clusterAlias, (k, v) -> {
138-
// TODO: once PARTIAL status is supported (partial results work to come), modify this code as needed
139139
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
140140
assert v.getTotalShards() != null && v.getSkippedShards() != null : "Null total or skipped shard count: " + v;
141-
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
141+
EsqlExecutionInfo.Cluster.Status newStatus = v.getStatus();
142+
// Do not update the status if it is already set to e.g. PARTIAL
143+
if (newStatus == EsqlExecutionInfo.Cluster.Status.RUNNING) {
144+
newStatus = EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
145+
}
146+
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(newStatus)
142147
/*
143148
* Total and skipped shard counts are set early in execution (after can-match).
144149
* Until ES|QL supports shard-level partial results, we just set all non-skipped shards
@@ -244,15 +249,16 @@ ActionListener<ComputeResponse> acquireCompute(@Nullable String computeClusterAl
244249

245250
private void updateExecutionInfoWithRemoteResponse(String computeClusterAlias, ComputeResponse resp) {
246251
TimeValue tookOnCluster;
252+
EsqlExecutionInfo.Cluster.Status resultStatus = resp.isPartial()
253+
? EsqlExecutionInfo.Cluster.Status.PARTIAL
254+
: EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
247255
if (resp.getTook() != null) {
248256
TimeValue remoteExecutionTime = resp.getTook();
249257
TimeValue planningTookTime = esqlExecutionInfo.planningTookTime();
250258
tookOnCluster = new TimeValue(planningTookTime.nanos() + remoteExecutionTime.nanos(), TimeUnit.NANOSECONDS);
251259
esqlExecutionInfo.swapCluster(
252260
computeClusterAlias,
253-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
254-
// for now ESQL doesn't return partial results, so set status to SUCCESSFUL
255-
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
261+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(resultStatus)
256262
.setTook(tookOnCluster)
257263
.setTotalShards(resp.getTotalShards())
258264
.setSuccessfulShards(resp.getSuccessfulShards())
@@ -267,11 +273,7 @@ private void updateExecutionInfoWithRemoteResponse(String computeClusterAlias, C
267273
tookOnCluster = new TimeValue(remoteTook, TimeUnit.NANOSECONDS);
268274
esqlExecutionInfo.swapCluster(
269275
computeClusterAlias,
270-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v)
271-
// for now ESQL doesn't return partial results, so set status to SUCCESSFUL
272-
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
273-
.setTook(tookOnCluster)
274-
.build()
276+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(resultStatus).setTook(tookOnCluster).build()
275277
);
276278
}
277279
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ final class ComputeResponse extends TransportResponse {
2929
public final int successfulShards;
3030
public final int skippedShards;
3131
public final int failedShards;
32+
public final boolean isPartial;
3233

3334
ComputeResponse(List<DriverProfile> profiles) {
34-
this(profiles, null, null, null, null, null);
35+
this(profiles, null, null, null, null, null, false);
3536
}
3637

3738
ComputeResponse(
@@ -40,14 +41,16 @@ final class ComputeResponse extends TransportResponse {
4041
Integer totalShards,
4142
Integer successfulShards,
4243
Integer skippedShards,
43-
Integer failedShards
44+
Integer failedShards,
45+
boolean isPartial
4446
) {
4547
this.profiles = profiles;
4648
this.took = took;
4749
this.totalShards = totalShards == null ? 0 : totalShards.intValue();
4850
this.successfulShards = successfulShards == null ? 0 : successfulShards.intValue();
4951
this.skippedShards = skippedShards == null ? 0 : skippedShards.intValue();
5052
this.failedShards = failedShards == null ? 0 : failedShards.intValue();
53+
this.isPartial = isPartial;
5154
}
5255

5356
ComputeResponse(StreamInput in) throws IOException {
@@ -74,6 +77,11 @@ final class ComputeResponse extends TransportResponse {
7477
this.skippedShards = 0;
7578
this.failedShards = 0;
7679
}
80+
if (in.getTransportVersion().onOrAfter(TransportVersions.COMPUTE_RESPONSE_PARTIAL)) {
81+
this.isPartial = in.readBoolean();
82+
} else {
83+
this.isPartial = false;
84+
}
7785
}
7886

7987
@Override
@@ -93,6 +101,9 @@ public void writeTo(StreamOutput out) throws IOException {
93101
out.writeVInt(skippedShards);
94102
out.writeVInt(failedShards);
95103
}
104+
if (out.getTransportVersion().onOrAfter(TransportVersions.COMPUTE_RESPONSE_PARTIAL)) {
105+
out.writeBoolean(isPartial);
106+
}
96107
}
97108

98109
public List<DriverProfile> getProfiles() {
@@ -118,4 +129,8 @@ public int getSkippedShards() {
118129
public int getFailedShards() {
119130
return failedShards;
120131
}
132+
133+
public boolean isPartial() {
134+
return isPartial;
135+
}
121136
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,7 @@ public void execute(
224224
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
225225
ActionListener.runBefore(computeListener.acquireAvoid(), () -> exchangeService.removeExchangeSourceHandler(sessionId))
226226
);
227-
exchangeSource.onFinishEarly(() -> {
228-
execInfo.setPartial();
229-
});
227+
exchangeSource.onFinishEarly(execInfo::markAsPartial);
230228
exchangeService.addExchangeSourceHandler(sessionId, exchangeSource);
231229
try (Releasable ignored = exchangeSource.addEmptySink()) {
232230
// run compute on the coordinator
@@ -802,6 +800,7 @@ private void runComputeOnDataNode(
802800
task.addListener(() -> exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled())));
803801
var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor, computeListener.acquireAvoid());
804802
exchangeSource.addRemoteSink(internalSink::fetchPageAsync, true, 1, ActionListener.noop());
803+
externalSink.setSource(exchangeSource);
805804
ActionListener<ComputeResponse> reductionListener = computeListener.acquireCompute();
806805
runCompute(
807806
task,
@@ -940,6 +939,8 @@ void runComputeOnRemoteCluster(
940939
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
941940
computeListener.acquireAvoid()
942941
);
942+
exchangeSink.setSource(exchangeSource);
943+
exchangeSource.onFinishEarly(() -> executionInfo.markClusterAsPartial(clusterAlias));
943944
try (Releasable ignored = exchangeSource.addEmptySink()) {
944945
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
945946
runCompute(

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ private ComputeResponse randomResponse(boolean includeExecutionInfo) {
116116
10,
117117
10,
118118
randomIntBetween(0, 3),
119-
0
119+
0,
120+
false
120121
);
121122
} else {
122123
return new ComputeResponse(profiles);

0 commit comments

Comments
 (0)