Skip to content

Commit 56c16d5

Browse files
committed
Merge remote-tracking branch 'elastic/main' into allow-partial-results
2 parents 9263829 + bb9c2c3 commit 56c16d5

File tree

4 files changed

+22
-14
lines changed

4 files changed

+22
-14
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ tests:
6969
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
7070
method: test {p0=transform/transforms_start_stop/Test start already started transform}
7171
issue: https://github.com/elastic/elasticsearch/issues/98802
72-
- class: org.elasticsearch.action.search.SearchPhaseControllerTests
73-
method: testProgressListener
74-
issue: https://github.com/elastic/elasticsearch/issues/116149
7572
- class: org.elasticsearch.xpack.deprecation.DeprecationHttpIT
7673
method: testDeprecatedSettingsReturnWarnings
7774
issue: https://github.com/elastic/elasticsearch/issues/108628

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
7676
private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times
7777
private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute
7878
private volatile boolean isPartial; // Does this request have partial results?
79+
private transient volatile boolean isStopped; // Have we received stop command?
7980

8081
public EsqlExecutionInfo(boolean includeCCSMetadata) {
8182
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
@@ -321,6 +322,14 @@ public void markAsPartial() {
321322
isPartial = true;
322323
}
323324

325+
public void markAsStopped() {
326+
isStopped = true;
327+
}
328+
329+
public boolean isStopped() {
330+
return isStopped;
331+
}
332+
324333
/**
325334
* Mark this cluster as having partial results.
326335
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -251,15 +251,17 @@ public void execute(
251251
exchangeSource,
252252
cancelQueryOnFailure,
253253
localListener.acquireCompute().map(r -> {
254-
localClusterWasInterrupted.set(execInfo.isPartial());
255-
execInfo.swapCluster(
256-
LOCAL_CLUSTER,
257-
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards())
258-
.setSuccessfulShards(r.getSuccessfulShards())
259-
.setSkippedShards(r.getSkippedShards())
260-
.setFailedShards(r.getFailedShards())
261-
.build()
262-
);
254+
localClusterWasInterrupted.set(execInfo.isStopped());
255+
if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
256+
execInfo.swapCluster(
257+
LOCAL_CLUSTER,
258+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards())
259+
.setSuccessfulShards(r.getSuccessfulShards())
260+
.setSkippedShards(r.getSkippedShards())
261+
.setFailedShards(r.getFailedShards())
262+
.build()
263+
);
264+
}
263265
return r.getProfiles();
264266
})
265267
);
@@ -290,7 +292,7 @@ public void execute(
290292
private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) {
291293
Function<EsqlExecutionInfo.Cluster.Status, EsqlExecutionInfo.Cluster.Status> runningToSuccess = status -> {
292294
if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) {
293-
return executionInfo.isPartial() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
295+
return executionInfo.isStopped() ? EsqlExecutionInfo.Cluster.Status.PARTIAL : EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
294296
} else {
295297
return status;
296298
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private void stopQueryAndReturnResult(Task task, AsyncExecutionId asyncId, Actio
118118
logger.debug("Async stop for task {} - stopping", asyncIdStr);
119119
final EsqlExecutionInfo esqlExecutionInfo = asyncTask.executionInfo();
120120
if (esqlExecutionInfo != null) {
121-
esqlExecutionInfo.markAsPartial();
121+
esqlExecutionInfo.markAsStopped();
122122
}
123123
Runnable getResults = () -> getResultsAction.execute(task, getAsyncResultRequest, listener);
124124
exchangeService.finishSessionEarly(sessionID(asyncId), ActionListener.running(() -> {

0 commit comments

Comments
 (0)