Skip to content

Commit d812fb4

Browse files
committed
Collect main plan result only after all subplan listeners have completed
1 parent b028c0a commit d812fb4

File tree

1 file changed

+17
-14
lines changed

1 file changed

+17
-14
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -244,21 +244,24 @@ public void execute(
244244
)
245245
) {
246246
runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute());
247-
}
248247

249-
for (PhysicalPlan subplan : subplans) {
250-
var childSessionId = newChildSession(sessionId);
251-
ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize());
252-
// funnel sub plan pages into the main plan exchange source
253-
mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop());
254-
executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> {
255-
exchangeSink.addCompletionListener(
256-
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
257-
);
258-
}, e -> {
259-
exchangeService.finishSinkHandler(childSessionId, e);
260-
finalListener.onFailure(e);
261-
}), () -> exchangeSink.createExchangeSink(() -> {}));
248+
for (PhysicalPlan subplan : subplans) {
249+
var childSessionId = newChildSession(sessionId);
250+
ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize());
251+
// funnel sub plan pages into the main plan exchange source
252+
mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop());
253+
var subPlanListener = localListener.acquireCompute();
254+
255+
executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> {
256+
exchangeSink.addCompletionListener(
257+
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
258+
);
259+
subPlanListener.onResponse(result.completionInfo());
260+
}, e -> {
261+
exchangeService.finishSinkHandler(childSessionId, e);
262+
subPlanListener.onFailure(e);
263+
}), () -> exchangeSink.createExchangeSink(() -> {}));
264+
}
262265
}
263266
}
264267
}

0 commit comments

Comments
 (0)