Skip to content

Commit 516abbe

Browse files
committed
Capture partial status
1 parent 124e070 commit 516abbe

File tree

3 files changed

+13
-0
lines changed

3 files changed

+13
-0
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public final class ExchangeSourceHandler {
4545

4646
private final AtomicInteger nextSinkId = new AtomicInteger();
4747
private final Map<Integer, RemoteSink> remoteSinks = ConcurrentCollections.newConcurrentMap();
48+
private Runnable finishEarlyHandler;
4849

4950
/**
5051
* Creates a new ExchangeSourceHandler.
@@ -83,6 +84,10 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
8384
}));
8485
}
8586

87+
public void onFinishEarly(Runnable finishEarlyHandler) {
88+
this.finishEarlyHandler = finishEarlyHandler;
89+
}
90+
8691
private class ExchangeSourceImpl implements ExchangeSource {
8792
private boolean finished;
8893

@@ -311,6 +316,9 @@ public Releasable addEmptySink() {
311316
* @param drainingPages whether to discard pages already fetched in the exchange
312317
*/
313318
public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
319+
if (finishEarlyHandler != null) {
320+
finishEarlyHandler.run();
321+
}
314322
buffer.finish(drainingPages);
315323
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(listener)) {
316324
for (RemoteSink remoteSink : remoteSinks.values()) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
5656
public static final ParseField FAILED_FIELD = new ParseField("failed");
5757
public static final ParseField DETAILS_FIELD = new ParseField("details");
5858
public static final ParseField TOOK = new ParseField("took");
59+
public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial");
5960

6061
// Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
6162
// The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
@@ -241,6 +242,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
241242
b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED));
242243
b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL));
243244
b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED));
245+
b.field(IS_PARTIAL_FIELD.getPreferredName(), isPartial);
244246
// each Cluster object defines its own field object name
245247
b.xContentObject("details", clusterInfo.values().iterator());
246248
});

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,9 @@ public void execute(
224224
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH),
225225
ActionListener.runBefore(computeListener.acquireAvoid(), () -> exchangeService.removeExchangeSourceHandler(sessionId))
226226
);
227+
exchangeSource.onFinishEarly(() -> {
228+
execInfo.setPartial();
229+
});
227230
exchangeService.addExchangeSourceHandler(sessionId, exchangeSource);
228231
try (Releasable ignored = exchangeSource.addEmptySink()) {
229232
// run compute on the coordinator

0 commit comments

Comments
 (0)