Skip to content

Commit 60f0be5

Browse files
committed
Working stream without streamed pages
1 parent c036a14 commit 60f0be5

File tree

5 files changed

+127
-229
lines changed

5 files changed

+127
-229
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public static Iterator<? extends ToXContent> columnarValues(List<ColumnInfoImpl>
9797
}
9898

9999
/** Returns a row based representation of the values in the given pages (described by the column infos). */
100-
public static Iterator<? extends ToXContent> rowValues(List<ColumnInfoImpl> columns, List<Page> pages, boolean[] nullColumns) {
100+
public static Iterator<? extends ToXContent> rowValues(List<ColumnInfoImpl> columns, Iterable<Page> pages, boolean[] nullColumns) {
101101
final BytesRef scratch = new BytesRef();
102102
return Iterators.flatMap(pages.iterator(), page -> {
103103
final int columnCount = columns.size();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlQueryAction.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1818
import org.elasticsearch.xcontent.XContentParser;
1919
import org.elasticsearch.xpack.esql.action.stream.EsqlQueryResponseStream;
20-
import org.elasticsearch.xpack.esql.action.stream.NonStreamingEsqlQueryResponseStream;
2120

2221
import java.io.IOException;
2322
import java.util.List;
@@ -54,16 +53,14 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
5453

5554
protected static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) {
5655
final Boolean partialResults = request.paramAsBoolean("allow_partial_results", null);
57-
final boolean stream = request.paramAsBoolean("stream", false);
56+
final boolean shouldStream = request.paramAsBoolean("stream", false);
5857
if (partialResults != null) {
5958
esqlRequest.allowPartialResults(partialResults);
6059
}
6160
LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query());
6261
// TODO: Create responseStream here, and add to the request object (?). See RestRepositoryVerifyIntegrityAction
6362
return channel -> {
64-
final var responseStream = stream
65-
? EsqlQueryResponseStream.forMediaType(channel, request)
66-
: new NonStreamingEsqlQueryResponseStream(channel, request, esqlRequest);
63+
final var responseStream = EsqlQueryResponseStream.forMediaType(channel, request, esqlRequest, shouldStream);
6764
esqlRequest.responseStream(responseStream);
6865
RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
6966
cancellableClient.execute(EsqlQueryAction.INSTANCE, esqlRequest, responseStream.completionListener());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/DefaultEsqlQueryResponseStream.java

Lines changed: 84 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,19 @@
77

88
package org.elasticsearch.xpack.esql.action.stream;
99

10+
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
1011
import org.elasticsearch.compute.data.Page;
12+
import org.elasticsearch.core.Nullable;
1113
import org.elasticsearch.rest.RestChannel;
14+
import org.elasticsearch.rest.RestRequest;
1215
import org.elasticsearch.xcontent.ToXContent;
1316
import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
17+
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
18+
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
1419
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
1520
import org.elasticsearch.xpack.esql.action.ResponseXContentUtils;
1621

22+
import java.io.IOException;
1723
import java.util.ArrayList;
1824
import java.util.Iterator;
1925
import java.util.List;
@@ -24,52 +30,114 @@
2430
* Default, XContent response stream.
2531
*/
2632
public class DefaultEsqlQueryResponseStream extends EsqlQueryResponseStream {
27-
DefaultEsqlQueryResponseStream(RestChannel restChannel, ToXContent.Params params) {
28-
super(restChannel, params);
33+
/**
34+
* Columns, stored on {@link #doStartResponse}, and used later when sending pages.
35+
*/
36+
@Nullable
37+
private List<ColumnInfoImpl> columns;
38+
39+
private boolean dropNullColumns;
40+
41+
DefaultEsqlQueryResponseStream(RestChannel restChannel, RestRequest restRequest, EsqlQueryRequest esqlRequest) throws IOException {
42+
super(restChannel, restRequest, esqlRequest);
43+
44+
this.dropNullColumns = restRequest.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false);
2945
}
3046

3147
@Override
3248
protected boolean canBeStreamed() {
33-
boolean dropNullColumns = params.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false);
34-
return dropNullColumns == false;
49+
return dropNullColumns == false && esqlRequest.columnar() == false;
3550
}
3651

3752
@Override
3853
protected void doStartResponse(List<ColumnInfoImpl> columns) {
39-
assert params.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false) == false : "this method doesn't support dropping null columns";
54+
assert dropNullColumns == false : "this method doesn't support dropping null columns";
4055

41-
var content = new ArrayList<Iterator<? extends ToXContent>>(1);
56+
var content = new ArrayList<Iterator<? extends ToXContent>>(3);
4257

58+
content.add(ChunkedToXContentHelper.startObject());
4359
content.add(ResponseXContentUtils.allColumns(columns, "columns"));
4460

61+
// Start the values array, to be filled in
62+
content.add(ChunkedToXContentHelper.startArray("values"));
63+
4564
sendChunks(content);
4665
}
4766

4867
@Override
4968
protected void doSendPages(Iterable<Page> pages) {
50-
// TODO: Implement
69+
assert columns != null : "columns must be set before sending pages";
70+
71+
sendChunks(List.of(ResponseXContentUtils.rowValues(columns, pages, null)));
5172
}
5273

5374
@Override
5475
protected void doFinishResponse(EsqlQueryResponse response) {
55-
// TODO: Implement
76+
var content = new ArrayList<Iterator<? extends ToXContent>>(10);
77+
78+
// End the values array
79+
content.add(ChunkedToXContentHelper.endArray());
80+
81+
var executionInfo = response.getExecutionInfo();
82+
if (executionInfo != null) {
83+
if (executionInfo.overallTook() != null) {
84+
content.add(
85+
ChunkedToXContentHelper.chunk(
86+
(builder, p) -> builder //
87+
.field("took", executionInfo.overallTook().millis())
88+
.field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial())
89+
)
90+
);
91+
}
92+
if (executionInfo.hasMetadataToReport()) {
93+
content.add(ChunkedToXContentHelper.field("_clusters", executionInfo, restRequest));
94+
}
95+
}
96+
content.add(
97+
ChunkedToXContentHelper.chunk(
98+
(builder, p) -> builder //
99+
.field("documents_found", response.documentsFound())
100+
.field("values_loaded", response.valuesLoaded())
101+
)
102+
);
103+
104+
var profile = response.profile();
105+
if (profile != null) {
106+
content.add(ChunkedToXContentHelper.startObject("profile"));
107+
content.add(ChunkedToXContentHelper.chunk((b, p) -> {
108+
if (executionInfo != null) {
109+
b.field("query", executionInfo.overallTimeSpan());
110+
b.field("planning", executionInfo.planningTimeSpan());
111+
}
112+
return b;
113+
}));
114+
content.add(ChunkedToXContentHelper.array("drivers", profile.drivers().iterator(), restRequest));
115+
content.add(ChunkedToXContentHelper.array("plans", profile.plans().iterator()));
116+
content.add(ChunkedToXContentHelper.endObject());
117+
}
118+
119+
content.add(ChunkedToXContentHelper.endObject());
120+
121+
sendChunks(content);
56122
}
57123

58124
@Override
59125
protected void doSendEverything(EsqlQueryResponse response) {
60-
boolean dropNullColumns = params.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false);
61-
62126
// TODO: Close the response
63127
// final Releasable releasable = releasableFromResponse(esqlResponse);
64128

65129
// TODO: Instead of sendChunks, implement a flush() to attach the response to it? Or pass the response to the methods
130+
// TODO: Or make "doX" methods return an Iterator<? extends ToXContent> and then concat them all together
66131

132+
boolean[] nullColumns = null;
67133
if (dropNullColumns) {
68-
sendStartResponseDroppingNullColumns(response.columns(), response.pages());
134+
nullColumns = nullColumns(response.columns(), response.pages());
135+
sendStartResponseDroppingNullColumns(response.columns(), nullColumns);
69136
} else {
70137
doStartResponse(response.columns());
71138
}
72-
doSendPages(response.pages());
139+
// doSendPages doesn't work with nullColumns or columnar, so we generate them here directly
140+
sendChunks(List.of(ResponseXContentUtils.columnValues(response.columns(), response.pages(), esqlRequest.columnar(), nullColumns)));
73141
doFinishResponse(response);
74142
}
75143

@@ -78,12 +146,12 @@ protected void doHandleException(Exception e) {
78146
// TODO: Implement this
79147
}
80148

81-
private void sendStartResponseDroppingNullColumns(List<ColumnInfoImpl> columns, List<Page> pages) {
82-
assert params.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false) : "this method should only be called when dropping null columns";
149+
private void sendStartResponseDroppingNullColumns(List<ColumnInfoImpl> columns, boolean[] nullColumns) {
150+
assert dropNullColumns : "this method should only be called when dropping null columns";
83151

84-
var content = new ArrayList<Iterator<? extends ToXContent>>(2);
152+
var content = new ArrayList<Iterator<? extends ToXContent>>(3);
85153

86-
boolean[] nullColumns = nullColumns(columns, pages);
154+
content.add(ChunkedToXContentHelper.startObject());
87155
content.add(ResponseXContentUtils.allColumns(columns, "all_columns"));
88156
content.add(ResponseXContentUtils.nonNullColumns(columns, nullColumns, "columns"));
89157

@@ -106,183 +174,4 @@ private boolean allColumnsAreNull(List<Page> pages, int c) {
106174
}
107175
return true;
108176
}
109-
110-
// TODO:
111-
/*
112-
On start, check if it's columnar or something else that disable streaming.
113-
If streaming is ok, create the StreamingXContentResponse and write the opening fragment.
114-
If not, leave it null.
115-
116-
On dispose, check if the StreamingXContentResponse is null or not.
117-
If it's null, send the full response directly.
118-
If it's not null, write the final fragment and close the StreamingXContentResponse.
119-
*/
120-
121-
/*
122-
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
123-
boolean dropNullColumns = params.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false);
124-
boolean[] nullColumns = dropNullColumns ? nullColumns() : null;
125-
126-
var content = new ArrayList<Iterator<? extends ToXContent>>(25);
127-
content.add(ChunkedToXContentHelper.startObject());
128-
if (isAsync) {
129-
content.add(ChunkedToXContentHelper.chunk((builder, p) -> {
130-
if (asyncExecutionId != null) {
131-
builder.field("id", asyncExecutionId);
132-
}
133-
builder.field("is_running", isRunning);
134-
return builder;
135-
}));
136-
}
137-
if (executionInfo != null && executionInfo.overallTook() != null) {
138-
content.add(
139-
ChunkedToXContentHelper.chunk(
140-
(builder, p) -> builder //
141-
.field("took", executionInfo.overallTook().millis())
142-
.field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial())
143-
)
144-
);
145-
}
146-
content.add(
147-
ChunkedToXContentHelper.chunk(
148-
(builder, p) -> builder //
149-
.field("documents_found", documentsFound)
150-
.field("values_loaded", valuesLoaded)
151-
)
152-
);
153-
if (dropNullColumns) {
154-
content.add(ResponseXContentUtils.allColumns(columns, "all_columns"));
155-
content.add(ResponseXContentUtils.nonNullColumns(columns, nullColumns, "columns"));
156-
} else {
157-
content.add(ResponseXContentUtils.allColumns(columns, "columns"));
158-
}
159-
content.add(
160-
ChunkedToXContentHelper.array("values", ResponseXContentUtils.columnValues(this.columns, this.pages, columnar, nullColumns))
161-
);
162-
if (executionInfo != null && executionInfo.hasMetadataToReport()) {
163-
content.add(ChunkedToXContentHelper.field("_clusters", executionInfo, params));
164-
}
165-
if (profile != null) {
166-
content.add(ChunkedToXContentHelper.startObject("profile"));
167-
content.add(ChunkedToXContentHelper.chunk((b, p) -> {
168-
if (executionInfo != null) {
169-
b.field("query", executionInfo.overallTimeSpan());
170-
b.field("planning", executionInfo.planningTimeSpan());
171-
}
172-
return b;
173-
}));
174-
content.add(ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params));
175-
content.add(ChunkedToXContentHelper.array("plans", profile.plans.iterator()));
176-
content.add(ChunkedToXContentHelper.endObject());
177-
}
178-
content.add(ChunkedToXContentHelper.endObject());
179-
180-
return Iterators.concat(content.toArray(Iterator[]::new));
181-
}*/
182-
183-
/*void startResponse(Releasable releasable) throws IOException {
184-
assert hasReferences();
185-
assert streamingXContentResponse == null;
186-
streamingXContentResponse = new StreamingXContentResponse(restChannel, restChannel.request(), () -> {});
187-
streamingXContentResponse.writeFragment(
188-
p0 -> ChunkedToXContentHelper.chunk((b, p) -> b.startObject().startArray("log")),
189-
releasable
190-
);
191-
}
192-
193-
void writePage(RepositoryVerifyIntegrityResponseChunk chunk, Releasable releasable) {
194-
assert hasReferences();
195-
assert streamingXContentResponse != null;
196-
197-
if (chunk.type() == RepositoryVerifyIntegrityResponseChunk.Type.ANOMALY) {
198-
anomalyCount.incrementAndGet();
199-
}
200-
streamingXContentResponse.writeFragment(
201-
p0 -> ChunkedToXContentHelper.chunk((b, p) -> b.startObject().value(chunk, p).endObject()),
202-
releasable
203-
);
204-
}
205-
206-
void writeChunk(RepositoryVerifyIntegrityResponseChunk chunk, Releasable releasable) {
207-
assert hasReferences();
208-
assert streamingXContentResponse != null;
209-
210-
if (chunk.type() == RepositoryVerifyIntegrityResponseChunk.Type.ANOMALY) {
211-
anomalyCount.incrementAndGet();
212-
}
213-
streamingXContentResponse.writeFragment(
214-
p0 -> ChunkedToXContentHelper.chunk((b, p) -> b.startObject().value(chunk, p).endObject()),
215-
releasable
216-
);
217-
}
218-
219-
@Override
220-
protected void closeInternal() {
221-
try {
222-
assert finalResultListener.isDone();
223-
finalResultListener.addListener(new ActionListener<>() {
224-
@Override
225-
public void onResponse(RepositoryVerifyIntegrityResponse repositoryVerifyIntegrityResponse) {
226-
// success - finish the response with the final results
227-
assert streamingXContentResponse != null;
228-
streamingXContentResponse.writeFragment(
229-
p0 -> ChunkedToXContentHelper.chunk(
230-
(b, p) -> b.endArray()
231-
.startObject("results")
232-
.field("status", repositoryVerifyIntegrityResponse.finalTaskStatus())
233-
.field("final_repository_generation", repositoryVerifyIntegrityResponse.finalRepositoryGeneration())
234-
.field("total_anomalies", anomalyCount.get())
235-
.field(
236-
"result",
237-
anomalyCount.get() == 0
238-
? repositoryVerifyIntegrityResponse
239-
.originalRepositoryGeneration() == repositoryVerifyIntegrityResponse.finalRepositoryGeneration()
240-
? "pass"
241-
: "inconclusive due to concurrent writes"
242-
: "fail"
243-
)
244-
.endObject()
245-
.endObject()
246-
),
247-
() -> {}
248-
);
249-
}
250-
251-
@Override
252-
public void onFailure(Exception e) {
253-
if (streamingXContentResponse != null) {
254-
// failure after starting the response - finish the response with a rendering of the final exception
255-
streamingXContentResponse.writeFragment(
256-
p0 -> ChunkedToXContentHelper.chunk(
257-
(b, p) -> b.endArray()
258-
.startObject("exception")
259-
.value((bb, pp) -> ElasticsearchException.generateFailureXContent(bb, pp, e, true))
260-
.field("status", ExceptionsHelper.status(e))
261-
.endObject()
262-
.endObject()
263-
),
264-
() -> {}
265-
);
266-
} else {
267-
// didn't even get as far as starting to stream the response, must have hit an early exception (e.g. repo not found)
268-
// so we can return this exception directly.
269-
try {
270-
restChannel.sendResponse(new RestResponse(restChannel, e));
271-
} catch (IOException e2) {
272-
e.addSuppressed(e2);
273-
logger.error("error building error response", e);
274-
assert false : e; // shouldn't actually throw anything here
275-
restChannel.request().getHttpChannel().close();
276-
}
277-
}
278-
}
279-
});
280-
} finally {
281-
Releasables.closeExpectNoException(streamingXContentResponse);
282-
}
283-
}
284-
285-
public ActionListener<RepositoryVerifyIntegrityResponse> getCompletionListener() {
286-
return completionListener;
287-
}*/
288177
}

0 commit comments

Comments
 (0)