Skip to content

Commit b6ecc10

Browse files
committed
Add empty sink to prevent exchange source to finish too early
1 parent 0d83c20 commit b6ecc10

File tree

1 file changed

+40
-40
lines changed

1 file changed

+40
-40
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -204,49 +204,49 @@ public void execute(
204204
);
205205

206206
exchangeService.addExchangeSourceHandler(mainSessionId, mainExchangeSource);
207-
var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId));
208-
209-
for (PhysicalPlan subplan : subplans) {
210-
var childSessionId = newChildSession(sessionId);
211-
ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize());
212-
// funnel sub plan pages into the main plan exchange source
213-
mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop());
214-
executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> {
215-
exchangeSink.addCompletionListener(
216-
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
217-
);
218-
}, e -> {
219-
exchangeService.finishSinkHandler(childSessionId, e);
220-
finalListener.onFailure(e);
221-
}), () -> exchangeSink.createExchangeSink(() -> {}));
222-
}
207+
try (var ignored = mainExchangeSource.addEmptySink()) {
208+
var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId));
209+
var computeContext = new ComputeContext(
210+
mainSessionId,
211+
"single",
212+
LOCAL_CLUSTER,
213+
List.of(),
214+
configuration,
215+
foldContext,
216+
mainExchangeSource::createExchangeSource,
217+
null
218+
);
223219

224-
var computeContext = new ComputeContext(
225-
newChildSession(sessionId),
226-
"single",
227-
LOCAL_CLUSTER,
228-
List.of(),
229-
configuration,
230-
foldContext,
231-
mainExchangeSource::createExchangeSource,
232-
null
233-
);
220+
Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask);
221+
PhysicalPlan finalMainPlan = mainPlan;
234222

235-
Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask);
223+
try (
224+
ComputeListener localListener = new ComputeListener(
225+
transportService.getThreadPool(),
226+
cancelQueryOnFailure,
227+
finalListener.map(profiles -> {
228+
execInfo.markEndQuery();
229+
return new Result(finalMainPlan.output(), collectedPages, profiles, execInfo);
230+
})
231+
)
232+
) {
233+
runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute());
234+
}
236235

237-
PhysicalPlan finalMainPlan = mainPlan;
238-
239-
try (
240-
ComputeListener localListener = new ComputeListener(
241-
transportService.getThreadPool(),
242-
cancelQueryOnFailure,
243-
finalListener.map(profiles -> {
244-
execInfo.markEndQuery();
245-
return new Result(finalMainPlan.output(), collectedPages, profiles, execInfo);
246-
})
247-
)
248-
) {
249-
runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute());
236+
for (PhysicalPlan subplan : subplans) {
237+
var childSessionId = newChildSession(sessionId);
238+
ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize());
239+
// funnel sub plan pages into the main plan exchange source
240+
mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop());
241+
executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> {
242+
exchangeSink.addCompletionListener(
243+
ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); })
244+
);
245+
}, e -> {
246+
exchangeService.finishSinkHandler(childSessionId, e);
247+
finalListener.onFailure(e);
248+
}), () -> exchangeSink.createExchangeSink(() -> {}));
249+
}
250250
}
251251
}
252252

0 commit comments

Comments
 (0)