Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a880fe3
WIP: Initial structure
ivancea Aug 5, 2025
fa7e8a9
Update docs/changelog/132453.yaml
ivancea Aug 5, 2025
f188619
Delete docs/changelog/132453.yaml
ivancea Aug 6, 2025
5d7a5ef
Merge branch 'main' into esql-stream-response
ivancea Aug 6, 2025
c036a14
Added error handling and set a completion listener to the action
ivancea Aug 6, 2025
60f0be5
Working stream without streamed pages
ivancea Aug 6, 2025
e8d2f4f
Integrated with the compute service and stream working
ivancea Aug 6, 2025
736e798
[CI] Auto commit changes from spotless
Aug 6, 2025
b810f12
Merge branch 'main' into esql-stream-response
ivancea Aug 7, 2025
b1db640
Update docs/changelog/132453.yaml
ivancea Aug 7, 2025
8db1bd6
Fix async and add stream to EsqlSpecIt tests
ivancea Aug 7, 2025
02ecea7
Changed all to work with Iterators and return them instead of calling…
ivancea Aug 7, 2025
880b41b
[CI] Auto commit changes from spotless
Aug 7, 2025
1dce435
Fix sending duplicated response on non streaming
ivancea Aug 8, 2025
4a8f33f
Moved XContent dependant code to the default stream class, and added …
ivancea Aug 8, 2025
8e9bba3
Added capability to avoid BWC errors
ivancea Aug 8, 2025
45e2654
Merge branch 'main' into esql-stream-response
ivancea Aug 8, 2025
b098a98
Move ToXContent to a generic, and apply it only in the XContent class…
ivancea Aug 8, 2025
5bc5e70
Send single pages
ivancea Aug 8, 2025
54345e4
Revert "Send single pages"
ivancea Aug 8, 2025
f972558
Text format working, pretty raw
ivancea Aug 8, 2025
b9c514c
Merge branch 'main' into esql-stream-response
ivancea Aug 13, 2025
f552750
Fixed test
ivancea Aug 13, 2025
1734318
Merge branch 'main' into esql-stream-response
ivancea Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132453.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132453
summary: Add streaming response to ESQL
area: ES|QL
type: enhancement
issues: []
385 changes: 385 additions & 0 deletions server/src/main/java/org/elasticsearch/rest/StreamingResponse.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,9 @@ protected static Request prepareRequestWithOptions(RequestObjectBuilder requestO
if (requestObject.allowPartialResults != null) {
request.addParameter("allow_partial_results", String.valueOf(requestObject.allowPartialResults));
}
if (randomBoolean() && hasCapabilities(client(), List.of(EsqlCapabilities.Cap.RESPONSE_STREAM.capabilityName()))) {
request.addParameter("stream", "true");
}

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,11 @@ public enum Cap {
*/
L2_NORM_VECTOR_SIMILARITY_FUNCTION(Build.current().isSnapshot()),

/**
* Support for ESQL query response stream.
*/
RESPONSE_STREAM,

/**
* Support for the options field of CATEGORIZE.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.esql.Column;
import org.elasticsearch.xpack.esql.action.stream.EsqlQueryResponseStream;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plugin.EsqlQueryStatus;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
Expand Down Expand Up @@ -57,6 +59,12 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
private boolean acceptedPragmaRisks = false;
private Boolean allowPartialResults = null;

/**
* If this field is null, the request does not support streaming.
*/
@Nullable
private EsqlQueryResponseStream responseStream;

/**
* "Tables" provided in the request for use with things like {@code LOOKUP}.
*/
Expand Down Expand Up @@ -247,6 +255,15 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {
return this;
}

@Nullable
public EsqlQueryResponseStream responseStream() {
return responseStream;
}

public void responseStream(EsqlQueryResponseStream responseStream) {
this.responseStream = responseStream;
}

@Override
public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), taskId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
private final AbstractRefCounted counted = AbstractRefCounted.of(this::closeInternal);

public static final String DROP_NULL_COLUMNS_OPTION = "drop_null_columns";
public static final String ALLOW_PARTIAL_RESULTS_OPTION = "allow_partial_results";
public static final String STREAM_OPTION = "stream";

private final List<ColumnInfoImpl> columns;
private final List<Page> pages;
Expand Down Expand Up @@ -177,7 +179,7 @@ public List<ColumnInfoImpl> columns() {
return columns;
}

List<Page> pages() {
public List<Page> pages() {
return pages;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class ResponseValueUtils {
* Returns an iterator of iterators over the values in the given pages. There is one iterator
* for each block.
*/
public static Iterator<Iterator<Object>> pagesToValues(List<DataType> dataTypes, List<Page> pages) {
public static Iterator<Iterator<Object>> pagesToValues(List<DataType> dataTypes, Iterable<Page> pages) {
BytesRef scratch = new BytesRef();
return Iterators.flatMap(
pages.iterator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
/**
* Collection of static utility methods for helping transform response data to XContent.
*/
final class ResponseXContentUtils {
public final class ResponseXContentUtils {

/**
* Returns the column headings for the given columns.
*/
static Iterator<? extends ToXContent> allColumns(List<ColumnInfoImpl> columns, String name) {
public static Iterator<ToXContent> allColumns(List<ColumnInfoImpl> columns, String name) {
return ChunkedToXContentHelper.chunk((builder, params) -> {
builder.startArray(name);
for (ColumnInfo col : columns) {
Expand All @@ -42,7 +42,7 @@ static Iterator<? extends ToXContent> allColumns(List<ColumnInfoImpl> columns, S
* Returns the column headings for the given columns, moving the heading
* for always-null columns to a {@code null_columns} section.
*/
static Iterator<? extends ToXContent> nonNullColumns(List<ColumnInfoImpl> columns, boolean[] nullColumns, String name) {
public static Iterator<ToXContent> nonNullColumns(List<ColumnInfoImpl> columns, boolean[] nullColumns, String name) {
return ChunkedToXContentHelper.chunk((builder, params) -> {
builder.startArray(name);
for (int c = 0; c < columns.size(); c++) {
Expand All @@ -55,7 +55,7 @@ static Iterator<? extends ToXContent> nonNullColumns(List<ColumnInfoImpl> column
}

/** Returns the column values for the given pages (described by the column infos). */
static Iterator<? extends ToXContent> columnValues(
public static Iterator<ToXContent> columnValues(
List<ColumnInfoImpl> columns,
List<Page> pages,
boolean columnar,
Expand All @@ -71,7 +71,7 @@ static Iterator<? extends ToXContent> columnValues(
}

/** Returns a columnar based representation of the values in the given pages (described by the column infos). */
static Iterator<? extends ToXContent> columnarValues(List<ColumnInfoImpl> columns, List<Page> pages, boolean[] nullColumns) {
public static Iterator<ToXContent> columnarValues(List<ColumnInfoImpl> columns, List<Page> pages, boolean[] nullColumns) {
final BytesRef scratch = new BytesRef();
return Iterators.flatMap(Iterators.forRange(0, columns.size(), column -> {
if (nullColumns != null && nullColumns[column]) {
Expand All @@ -97,7 +97,7 @@ static Iterator<? extends ToXContent> columnarValues(List<ColumnInfoImpl> column
}

/** Returns a row based representation of the values in the given pages (described by the column infos). */
static Iterator<? extends ToXContent> rowValues(List<ColumnInfoImpl> columns, List<Page> pages, boolean[] nullColumns) {
public static Iterator<ToXContent> rowValues(List<ColumnInfoImpl> columns, Iterable<Page> pages, boolean[] nullColumns) {
final BytesRef scratch = new BytesRef();
return Iterators.flatMap(pages.iterator(), page -> {
final int columnCount = columns.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.ALLOW_PARTIAL_RESULTS_OPTION;
import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.DROP_NULL_COLUMNS_OPTION;
import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.STREAM_OPTION;
import static org.elasticsearch.xpack.esql.formatter.TextFormat.URL_PARAM_DELIMITER;

@ServerlessScope(Scope.PUBLIC)
Expand All @@ -46,10 +49,30 @@ public Set<String> supportedCapabilities() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
try (XContentParser parser = request.contentOrSourceParamParser()) {
return RestEsqlQueryAction.restChannelConsumer(RequestXContent.parseAsync(parser), request, client);
return restChannelConsumer(RequestXContent.parseAsync(parser), request, client);
}
}

// TODO: Remove and reuse the RestEsqlQueryAction method again if possible
private static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) {
final Boolean partialResults = request.paramAsBoolean(ALLOW_PARTIAL_RESULTS_OPTION, null);
// Just to consume the parameter until streaming is implemented
request.paramAsBoolean(STREAM_OPTION, false);
if (partialResults != null) {
esqlRequest.allowPartialResults(partialResults);
}
LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query());

return channel -> {
RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancellableClient.execute(
EsqlQueryAction.INSTANCE,
esqlRequest,
new EsqlResponseListener(channel, request, esqlRequest).wrapWithLogging()
);
};
}

@Override
protected Set<String> responseParams() {
return Set.of(URL_PARAM_DELIMITER, DROP_NULL_COLUMNS_OPTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.esql.action.stream.EsqlQueryResponseStream;

import java.io.IOException;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.ALLOW_PARTIAL_RESULTS_OPTION;
import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.STREAM_OPTION;
import static org.elasticsearch.xpack.esql.formatter.TextFormat.URL_PARAM_DELIMITER;

@ServerlessScope(Scope.PUBLIC)
Expand Down Expand Up @@ -50,20 +53,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
}
}

protected static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) {
final Boolean partialResults = request.paramAsBoolean("allow_partial_results", null);
private static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) {
final Boolean partialResults = request.paramAsBoolean(ALLOW_PARTIAL_RESULTS_OPTION, null);
final boolean shouldStream = request.paramAsBoolean(STREAM_OPTION, false);
if (partialResults != null) {
esqlRequest.allowPartialResults(partialResults);
}
LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query());

// TODO: Create responseStream here, and add to the request object (?). See RestRepositoryVerifyIntegrityAction
return channel -> {
final var responseStream = EsqlQueryResponseStream.forMediaType(channel, request, esqlRequest, shouldStream);
esqlRequest.responseStream(responseStream);
RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancellableClient.execute(
EsqlQueryAction.INSTANCE,
esqlRequest,
new EsqlResponseListener(channel, request, esqlRequest).wrapWithLogging()
);
cancellableClient.execute(EsqlQueryAction.INSTANCE, esqlRequest, responseStream.completionListener());
};
}

Expand Down
Loading