Skip to content

Commit e8d2f4f

Browse files
committed
Integrated with the compute service and stream working
1 parent 60f0be5 commit e8d2f4f

File tree

4 files changed

+56
-7
lines changed

4 files changed

+56
-7
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/DefaultEsqlQueryResponseStream.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ protected boolean canBeStreamed() {
5353
protected void doStartResponse(List<ColumnInfoImpl> columns) {
5454
assert dropNullColumns == false : "this method doesn't support dropping null columns";
5555

56+
this.columns = columns;
57+
5658
var content = new ArrayList<Iterator<? extends ToXContent>>(3);
5759

5860
content.add(ChunkedToXContentHelper.startObject());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/EsqlQueryResponseStream.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
2424
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
2525
import org.elasticsearch.xpack.esql.arrow.ArrowFormat;
26+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
27+
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
2628
import org.elasticsearch.xpack.esql.formatter.TextFormat;
2729
import org.elasticsearch.xpack.esql.plugin.EsqlMediaTypeParser;
2830

2931
import java.io.IOException;
32+
import java.util.ArrayList;
33+
import java.util.Collections;
3034
import java.util.Iterator;
3135
import java.util.List;
3236

@@ -54,7 +58,7 @@ public static EsqlQueryResponseStream forMediaType(
5458
return new NonStreamingEsqlQueryResponseStream(restChannel, restRequest, esqlRequest);
5559
}
5660

57-
MediaType mediaType = EsqlMediaTypeParser.getResponseMediaType(restRequest, XContentType.JSON);
61+
MediaType mediaType = EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest);
5862

5963
if (mediaType instanceof TextFormat) {
6064
// TODO: Add support
@@ -71,6 +75,7 @@ public static EsqlQueryResponseStream forMediaType(
7175
protected final RestRequest restRequest;
7276
protected final EsqlQueryRequest esqlRequest;
7377

78+
// TODO: Maybe create this on startResponse()? Does creating this do something with the response? Can we still safely set headers?
7479
private final StreamingXContentResponse streamingXContentResponse;
7580

7681
/**
@@ -95,14 +100,27 @@ protected EsqlQueryResponseStream(RestChannel restChannel, RestRequest restReque
95100
/**
96101
* Starts the response stream. This is the first method to be called
97102
*/
98-
public final void startResponse(List<ColumnInfoImpl> columns) {
103+
public final void startResponse(List<Attribute> schema) {
99104
assert initialStreamChunkSent : "startResponse() called more than once";
100105
assert finished == false : "sendPages() called on a finished stream";
101106

102107
if (canBeStreamed() == false) {
103108
return;
104109
}
105110

111+
// TODO: Copied from TransportEsqlQueryAction#toResponse. Deduplicate this code
112+
List<ColumnInfoImpl> columns = schema.stream().map(c -> {
113+
List<String> originalTypes;
114+
if (c instanceof UnsupportedAttribute ua) {
115+
// Sort the original types so they are easier to test against and prettier.
116+
originalTypes = new ArrayList<>(ua.originalTypes());
117+
Collections.sort(originalTypes);
118+
} else {
119+
originalTypes = null;
120+
}
121+
return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes);
122+
}).toList();
123+
106124
doStartResponse(columns);
107125
initialStreamChunkSent = true;
108126
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
2727
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
2828
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
29+
import org.elasticsearch.core.Nullable;
2930
import org.elasticsearch.core.RefCounted;
3031
import org.elasticsearch.core.Releasable;
3132
import org.elasticsearch.core.Releasables;
@@ -49,6 +50,7 @@
4950
import org.elasticsearch.transport.TransportService;
5051
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
5152
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
53+
import org.elasticsearch.xpack.esql.action.stream.EsqlQueryResponseStream;
5254
import org.elasticsearch.xpack.esql.core.expression.Attribute;
5355
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
5456
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
@@ -189,6 +191,7 @@ public void execute(
189191
Configuration configuration,
190192
FoldContext foldContext,
191193
EsqlExecutionInfo execInfo,
194+
EsqlQueryResponseStream responseStream,
192195
ActionListener<Result> listener
193196
) {
194197
assert ThreadPool.assertCurrentThreadPool(
@@ -204,12 +207,31 @@ public void execute(
204207

205208
// we have no sub plans, so we can just execute the given plan
206209
if (subplans == null || subplans.isEmpty()) {
207-
executePlan(sessionId, rootTask, flags, physicalPlan, configuration, foldContext, execInfo, null, listener, null);
210+
responseStream.startResponse(physicalPlan.output());
211+
212+
executePlan(
213+
sessionId,
214+
rootTask,
215+
flags,
216+
physicalPlan,
217+
configuration,
218+
foldContext,
219+
execInfo,
220+
null,
221+
listener,
222+
responseStream,
223+
null
224+
);
208225
return;
209226
}
210227

211228
final List<Page> collectedPages = Collections.synchronizedList(new ArrayList<>());
212-
PhysicalPlan mainPlan = new OutputExec(subplansAndMainPlan.v2(), collectedPages::add);
229+
PhysicalPlan mainPlan = new OutputExec(subplansAndMainPlan.v2(), page -> {
230+
collectedPages.add(page);
231+
responseStream.sendPages(List.of(page));
232+
});
233+
234+
responseStream.startResponse(mainPlan.output());
213235

214236
listener = listener.delegateResponse((l, e) -> {
215237
collectedPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks));
@@ -279,6 +301,7 @@ public void execute(
279301
exchangeService.finishSinkHandler(childSessionId, e);
280302
subPlanListener.onFailure(e);
281303
}),
304+
null,
282305
() -> exchangeSink.createExchangeSink(() -> {})
283306
);
284307
}
@@ -294,9 +317,10 @@ public void executePlan(
294317
Configuration configuration,
295318
FoldContext foldContext,
296319
EsqlExecutionInfo execInfo,
297-
String profileQualifier,
320+
@Nullable String profileQualifier,
298321
ActionListener<Result> listener,
299-
Supplier<ExchangeSink> exchangeSinkSupplier
322+
@Nullable EsqlQueryResponseStream responseStream,
323+
@Nullable Supplier<ExchangeSink> exchangeSinkSupplier
300324
) {
301325
Tuple<PhysicalPlan, PhysicalPlan> coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(
302326
physicalPlan,
@@ -310,7 +334,11 @@ public void executePlan(
310334
PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1();
311335

312336
if (exchangeSinkSupplier == null) {
313-
coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), collectedPages::add);
337+
assert responseStream != null : "responseStream must not be null when exchangeSinkSupplier is null";
338+
coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), page -> {
339+
collectedPages.add(page);
340+
responseStream.sendPages(List.of(page));
341+
});
314342
}
315343

316344
PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
250250
configuration,
251251
foldCtx,
252252
executionInfo,
253+
request.responseStream(),
253254
resultListener
254255
);
255256
planExecutor.esql(

0 commit comments

Comments
 (0)