diff --git a/docs/changelog/132453.yaml b/docs/changelog/132453.yaml new file mode 100644 index 0000000000000..393547219efa1 --- /dev/null +++ b/docs/changelog/132453.yaml @@ -0,0 +1,5 @@ +pr: 132453 +summary: Add streaming response to ESQL +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/rest/StreamingResponse.java b/server/src/main/java/org/elasticsearch/rest/StreamingResponse.java new file mode 100644 index 0000000000000..f187aacbd347f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/StreamingResponse.java @@ -0,0 +1,385 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.rest; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.transport.Transports; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A REST response with any body to which the caller can write fragments in an asynchronous and streaming fashion. + *

+ * Callers submit individual fragments of content using {@link #writeFragment}. Internally, the output entries are held in a queue. + * If the queue becomes empty then the response transmission is paused until the next entry becomes available. + *

+ * The internal queue is unbounded. It is the caller's responsibility to ensure that the response does not consume an excess of resources + * while it's being sent. + *

+ * The caller must eventually call {@link StreamingResponse#close} to finish the transmission of the response. + */ +public final class StreamingResponse implements Releasable { + + private final RestChannel restChannel; + private final String responseContentType; + private final Releasable onCompletion; + private final CheckedBiConsumer writer; + + /** + * A listener for the next fragment to become available for transmission after a pause. Completed with the newly-created unique active + * {@link AvailableFragmentsResponseBodyPart} within {@link #writeFragment}, and subscribed to via + * {@link AvailableFragmentsResponseBodyPart#getNextPart} when the current {@link AvailableFragmentsResponseBodyPart} + * becomes inactive because of a transmission pause. + */ + @Nullable // if the first fragment hasn't been sent yet + private SubscribableListener nextAvailableFragmentListener; + + /** + * A resource to be released when the transmission of the current fragment is complete. Note that we may complete the transmission of + * multiple fragments at the same time, if they are all processed by one call to {@link AvailableFragmentsResponseBodyPart#encodeChunk} + * and transmitted together. + */ + @Nullable // if not currently sending a fragment + private Releasable currentFragmentReleasable; + + /** + * @param restChannel The {@link RestChannel} on which to send the response. + * @param responseContentType The content type of the response. + * @param onCompletion A resource which is released when the transmission is complete. + */ + public StreamingResponse( + RestChannel restChannel, + String responseContentType, + Releasable onCompletion, + CheckedBiConsumer writer + ) { + this.restChannel = restChannel; + this.responseContentType = responseContentType; + this.onCompletion = onCompletion; + this.writer = writer; + } + + /** + * Close this {@link StreamingResponse}, indicating that there will be no more fragments to send. + */ + @Override + public void close() { + writeFragment(NO_MORE_FRAGMENTS, () -> { + if (isRestResponseFinished.compareAndSet(false, true)) { + queueRefs.decRef(); + } + }); + } + + /** + * Enqueue the given fragment for transmission. + * @param fragment The fragment to send. + * @param releasable A resource which is released when the fragment has been completely processed, i.e. when + *

+ */ + public void writeFragment(Iterator fragment, Releasable releasable) { + if (tryAcquireQueueRef()) { + try { + fragmentQueue.add(new StreamingFragment<>(fragment, releasable)); + if (queueLength.getAndIncrement() == 0) { + // There is no active AvailableChunksZipResponseBodyPart, but there is now an entry in the queue, so we must create a + // AvailableChunksZipResponseBodyPart to process it (along with any other entries that are concurrently added to the + // queue). It's safe to mutate releasable and continuationListener here because they are only otherwise accessed by an + // active AvailableChunksZipResponseBodyPart (which does not exist) or when all queueRefs have been released (which they + // have not here). + final var nextFragment = fragmentQueue.poll(); + assert nextFragment != null; + final var availableFragments = new AvailableFragmentsResponseBodyPart(nextFragment.fragment()); + assert currentFragmentReleasable == null; + currentFragmentReleasable = nextFragment.releasable(); + final var currentAvailableFragmentListener = nextAvailableFragmentListener; + nextAvailableFragmentListener = new SubscribableListener<>(); + if (currentAvailableFragmentListener == null) { + // We are not resuming after a pause, this is the first fragment to be sent, so we start the response transmission. + restChannel.sendResponse(RestResponse.chunked(RestStatus.OK, availableFragments, this::restResponseFinished)); + } else { + // We are resuming transmission after a pause, so just carry on sending the response body. + assert currentAvailableFragmentListener.isDone() == false; + currentAvailableFragmentListener.onResponse(availableFragments); + } + } + } finally { + queueRefs.decRef(); + } + } else { + Releasables.closeExpectNoException(releasable); + } + } + + /** + * A fragment which is ready for transmission, to be stored in {@link #fragmentQueue}. + * + * @param fragment The fragment to send. + * @param releasable A resource to release when this fragment has been fully transmitted, or is no longer required because the + * transmission was cancelled. + */ + private record StreamingFragment(Iterator fragment, Releasable releasable) {} + + /** + * Queue of fragments that are ready for transmission. + */ + private final Queue> fragmentQueue = new LinkedBlockingQueue<>(); + + /** + * Upper bound on the number of fragments in the queue, atomically modified to ensure there's only one thread processing the queue + * at once. + */ + private final AtomicInteger queueLength = new AtomicInteger(); + + /** + * Ref-counting for access to the queue, to avoid clearing the queue on abort concurrently with a fragment being sent. + */ + private final RefCounted queueRefs = AbstractRefCounted.of(this::drainQueue); + + /** + * Flag to indicate if the request has been aborted, at which point we should stop enqueueing more fragments and promptly clean up the + * ones being sent. It's safe to ignore this, but without it in theory a constant stream of calls to {@link #writeFragment} could + * prevent {@link #drainQueue} from running for arbitrarily long. + */ + private final AtomicBoolean isRestResponseFinished = new AtomicBoolean(); + + private boolean tryAcquireQueueRef() { + return isRestResponseFinished.get() == false && queueRefs.tryIncRef(); + } + + private void restResponseFinished() { + assert Transports.assertTransportThread(); + if (isRestResponseFinished.compareAndSet(false, true)) { + queueRefs.decRef(); + } + } + + private void drainQueue() { + assert isRestResponseFinished.get(); + assert queueRefs.hasReferences() == false; + final var taskCount = queueLength.get() + 2 /* currentFragmentReleasable and onCompletion */ ; + final var releasables = new ArrayList(taskCount); + try { + releasables.add(currentFragmentReleasable); + currentFragmentReleasable = null; + StreamingFragment fragment; + while ((fragment = fragmentQueue.poll()) != null) { + releasables.add(fragment.releasable()); + } + assert fragmentQueue.isEmpty() : fragmentQueue.size(); // no concurrent adds + assert releasables.size() == taskCount - 1 || releasables.size() == taskCount - 2 : taskCount + " vs " + releasables.size(); + } finally { + releasables.add(onCompletion); + Releasables.closeExpectNoException(Releasables.wrap(releasables)); + } + } + + /** + * A {@link ChunkedRestResponseBodyPart} which will yield all currently-available fragments by consuming from {@link #fragmentQueue}. + * There is only ever at most one active instance of this class at any time, in the sense that one such instance becoming inactive + * happens-before the creation of the next instance. One of these parts may send chunks for more than one fragment. + */ + private final class AvailableFragmentsResponseBodyPart implements ChunkedRestResponseBodyPart { + + /** + * An iterator over the chunks of the fragment currently being transmitted. + */ + private Iterator fragmentChunksIterator; + + /** + * True when we have run out of chunks ready for immediate transmission, so the response is paused, but we expect to send more data + * later. + */ + private boolean isResponsePaused; + + /** + * True when we have sent the last chunk of the last fragment, or the response was cancelled. + */ + private boolean isResponseComplete; + + /** + * A listener which is created when there are no more available fragments, so transmission is paused, subscribed to in + * {@link #getNextPart}, and then completed with the next body part (sequence of fragments, i.e. a new (unique) active + * {@link AvailableFragmentsResponseBodyPart}). + */ + private SubscribableListener getNextPartListener; + + /** + * A cache for an empty list to be used to collect the {@code Releasable} instances to be released when the next chunk has been + * fully transmitted. It's a list because a call to {@link #encodeChunk} may yield a chunk that completes several fragments, each of + * which has its own resources to release. We cache this value across chunks because most chunks won't release anything, so we can + * keep the empty list around for later to save on allocations. + */ + private ArrayList nextReleasablesCache = new ArrayList<>(); + + AvailableFragmentsResponseBodyPart(Iterator fragmentChunksIterator) { + this.fragmentChunksIterator = fragmentChunksIterator; + } + + /** + * @return whether this part of the response is complete + */ + @Override + public boolean isPartComplete() { + return isResponsePaused || isResponseComplete; + } + + @Override + public boolean isLastPart() { + return isResponseComplete; + } + + @Override + public void getNextPart(ActionListener listener) { + assert getNextPartListener != null; + getNextPartListener.addListener(listener); + } + + /** + * Transfer {@link #currentFragmentReleasable} into the supplied collection (i.e. add it to {@code releasables} and then clear + * {@link #currentFragmentReleasable}). Called when the last chunk of the current fragment is serialized, so that we + * can start serializing chunks of the next fragment straight away whilst delaying the release of the current fragment's resources + * until the transmission of the chunk that is currently under construction. + */ + private void transferCurrentFragmentReleasable(ArrayList releasables) { + assert queueRefs.hasReferences(); + + if (currentFragmentReleasable == null) { + return; + } + + if (releasables == nextReleasablesCache) { + // adding the first value, so we must line up a new cached value for the next caller + nextReleasablesCache = new ArrayList<>(); + } + + releasables.add(currentFragmentReleasable); + currentFragmentReleasable = null; + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { + assert Transports.isTransportThread(Thread.currentThread()); + + final ArrayList releasables = nextReleasablesCache; + assert releasables.isEmpty(); + try { + if (tryAcquireQueueRef()) { + try { + assert queueLength.get() > 0; + // This is the current unique active AvailableFragmentsResponseBodyPart (i.e. queueLength is strictly positive and + // we hold a queueRef), so any concurrent calls to writeFragment() at this point will just add to the queue and + // won't spawn a new AvailableFragmentsResponseBodyPart or mutate any fields. + + final RecyclerBytesStreamOutput chunkStream = new RecyclerBytesStreamOutput(recycler); + + do { + if (fragmentChunksIterator.hasNext()) { + writer.accept(fragmentChunksIterator.next(), chunkStream); + } else { + completeCurrentFragment(releasables); + } + } while (isResponseComplete == false && isResponsePaused == false && chunkStream.size() < sizeHint); + + assert (releasables == nextReleasablesCache) == releasables.isEmpty(); + assert nextReleasablesCache.isEmpty(); + + final Releasable chunkStreamReleasable = () -> Releasables.closeExpectNoException(chunkStream); + final var result = new ReleasableBytesReference( + chunkStream.bytes(), + releasables.isEmpty() + ? chunkStreamReleasable + : Releasables.wrap(Iterators.concat(Iterators.single(chunkStreamReleasable), releasables.iterator())) + ); + return result; + } finally { + queueRefs.decRef(); + } + } else { + // request aborted, nothing more to send (queue is being cleared by queueRefs#closeInternal) + isResponseComplete = true; + return new ReleasableBytesReference(BytesArray.EMPTY, () -> {}); + } + } catch (Exception e) { + logger.error("failure encoding chunk", e); + throw e; + } + } + + private void completeCurrentFragment(ArrayList releasables) throws IOException { + transferCurrentFragmentReleasable(releasables); + final var localNextAvailableFragmentListener = nextAvailableFragmentListener; // read before queue len decr + final var newQueueLength = queueLength.decrementAndGet(); + if (fragmentChunksIterator == NO_MORE_FRAGMENTS) { + isResponseComplete = true; + } else if (newQueueLength == 0) { + isResponsePaused = true; + assert getNextPartListener == null; + assert localNextAvailableFragmentListener != null; + // Calling our getNextPart() will eventually yield the next fragment supplied to writeFragment(): + getNextPartListener = localNextAvailableFragmentListener; + } else { + // The current fragment is complete, and the next fragment is already available, so we start sending its + // chunks too. This means we're still the unique active AvailableFragmentsResponseBodyPart. We re-use + // this AvailableFragmentsResponseBodyPart instance rather than creating a new one to avoid unnecessary + // allocations. + + final var nextFragment = fragmentQueue.poll(); + assert nextFragment != null; + currentFragmentReleasable = nextFragment.releasable(); + fragmentChunksIterator = nextFragment.fragment(); + } + } + + @Override + public String getResponseContentTypeString() { + return responseContentType; + } + } + + /** + * Sentinel fragment indicating the end of the response. + */ + private final Iterator NO_MORE_FRAGMENTS = new Iterator<>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public T next() { + assert false : "not called"; + return null; + } + }; +} diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 61cc5c80cbc00..f029ce70cc961 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -1575,6 +1575,9 @@ protected static Request prepareRequestWithOptions(RequestObjectBuilder requestO if (requestObject.allowPartialResults != null) { request.addParameter("allow_partial_results", String.valueOf(requestObject.allowPartialResults)); } + if (randomBoolean() && hasCapabilities(client(), List.of(EsqlCapabilities.Cap.RESPONSE_STREAM.capabilityName()))) { + request.addParameter("stream", "true"); + } RequestOptions.Builder options = request.getOptions().toBuilder(); options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index cdef9f8c33cbd..a205a6a0ff885 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1351,6 +1351,11 @@ public enum Cap { */ L2_NORM_VECTOR_SIMILARITY_FUNCTION(Build.current().isSnapshot()), + /** + * Support for ESQL query response stream. + */ + RESPONSE_STREAM, + /** * Support for the options field of CATEGORIZE. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index ffc0c1e670ed5..59f9180545860 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; @@ -23,6 +24,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.esql.Column; +import org.elasticsearch.xpack.esql.action.stream.EsqlQueryResponseStream; import org.elasticsearch.xpack.esql.parser.QueryParams; import org.elasticsearch.xpack.esql.plugin.EsqlQueryStatus; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -57,6 +59,12 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E private boolean acceptedPragmaRisks = false; private Boolean allowPartialResults = null; + /** + * If this field is null, the request does not support streaming. + */ + @Nullable + private EsqlQueryResponseStream responseStream; + /** * "Tables" provided in the request for use with things like {@code LOOKUP}. */ @@ -247,6 +255,15 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) { return this; } + @Nullable + public EsqlQueryResponseStream responseStream() { + return responseStream; + } + + public void responseStream(EsqlQueryResponseStream responseStream) { + this.responseStream = responseStream; + } + @Override public Task createTask(TaskId taskId, String type, String action, TaskId parentTaskId, Map headers) { var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), taskId)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 4afb1418b2585..59ee816c41813 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -49,6 +49,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action. private final AbstractRefCounted counted = AbstractRefCounted.of(this::closeInternal); public static final String DROP_NULL_COLUMNS_OPTION = "drop_null_columns"; + public static final String ALLOW_PARTIAL_RESULTS_OPTION = "allow_partial_results"; + public static final String STREAM_OPTION = "stream"; private final List columns; private final List pages; @@ -177,7 +179,7 @@ public List columns() { return columns; } - List pages() { + public List pages() { return pages; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java index 3a406de60ace7..c12bfb28156f6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java @@ -48,7 +48,7 @@ public final class ResponseValueUtils { * Returns an iterator of iterators over the values in the given pages. There is one iterator * for each block. */ - public static Iterator> pagesToValues(List dataTypes, List pages) { + public static Iterator> pagesToValues(List dataTypes, Iterable pages) { BytesRef scratch = new BytesRef(); return Iterators.flatMap( pages.iterator(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java index 38b391552cfe2..b6cfa3875c159 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java @@ -23,12 +23,12 @@ /** * Collection of static utility methods for helping transform response data to XContent. */ -final class ResponseXContentUtils { +public final class ResponseXContentUtils { /** * Returns the column headings for the given columns. */ - static Iterator allColumns(List columns, String name) { + public static Iterator allColumns(List columns, String name) { return ChunkedToXContentHelper.chunk((builder, params) -> { builder.startArray(name); for (ColumnInfo col : columns) { @@ -42,7 +42,7 @@ static Iterator allColumns(List columns, S * Returns the column headings for the given columns, moving the heading * for always-null columns to a {@code null_columns} section. */ - static Iterator nonNullColumns(List columns, boolean[] nullColumns, String name) { + public static Iterator nonNullColumns(List columns, boolean[] nullColumns, String name) { return ChunkedToXContentHelper.chunk((builder, params) -> { builder.startArray(name); for (int c = 0; c < columns.size(); c++) { @@ -55,7 +55,7 @@ static Iterator nonNullColumns(List column } /** Returns the column values for the given pages (described by the column infos). */ - static Iterator columnValues( + public static Iterator columnValues( List columns, List pages, boolean columnar, @@ -71,7 +71,7 @@ static Iterator columnValues( } /** Returns a columnar based representation of the values in the given pages (described by the column infos). */ - static Iterator columnarValues(List columns, List pages, boolean[] nullColumns) { + public static Iterator columnarValues(List columns, List pages, boolean[] nullColumns) { final BytesRef scratch = new BytesRef(); return Iterators.flatMap(Iterators.forRange(0, columns.size(), column -> { if (nullColumns != null && nullColumns[column]) { @@ -97,7 +97,7 @@ static Iterator columnarValues(List column } /** Returns a row based representation of the values in the given pages (described by the column infos). */ - static Iterator rowValues(List columns, List pages, boolean[] nullColumns) { + public static Iterator rowValues(List columns, Iterable pages, boolean[] nullColumns) { final BytesRef scratch = new BytesRef(); return Iterators.flatMap(pages.iterator(), page -> { final int columnCount = columns.size(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlAsyncQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlAsyncQueryAction.java index 0fd35bc3c455a..9f8db3fad2917 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlAsyncQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlAsyncQueryAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; @@ -21,7 +22,9 @@ import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.ALLOW_PARTIAL_RESULTS_OPTION; import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.DROP_NULL_COLUMNS_OPTION; +import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.STREAM_OPTION; import static org.elasticsearch.xpack.esql.formatter.TextFormat.URL_PARAM_DELIMITER; @ServerlessScope(Scope.PUBLIC) @@ -46,10 +49,30 @@ public Set supportedCapabilities() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { try (XContentParser parser = request.contentOrSourceParamParser()) { - return RestEsqlQueryAction.restChannelConsumer(RequestXContent.parseAsync(parser), request, client); + return restChannelConsumer(RequestXContent.parseAsync(parser), request, client); } } + // TODO: Remove and reuse the RestEsqlQueryAction method again if possible + private static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) { + final Boolean partialResults = request.paramAsBoolean(ALLOW_PARTIAL_RESULTS_OPTION, null); + // Just to consume the parameter until streaming is implemented + request.paramAsBoolean(STREAM_OPTION, false); + if (partialResults != null) { + esqlRequest.allowPartialResults(partialResults); + } + LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query()); + + return channel -> { + RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel()); + cancellableClient.execute( + EsqlQueryAction.INSTANCE, + esqlRequest, + new EsqlResponseListener(channel, request, esqlRequest).wrapWithLogging() + ); + }; + } + @Override protected Set responseParams() { return Set.of(URL_PARAM_DELIMITER, DROP_NULL_COLUMNS_OPTION); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlQueryAction.java index f86033a4781c1..98a2a0b9057e5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlQueryAction.java @@ -16,12 +16,15 @@ import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.esql.action.stream.EsqlQueryResponseStream; import java.io.IOException; import java.util.List; import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.ALLOW_PARTIAL_RESULTS_OPTION; +import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.STREAM_OPTION; import static org.elasticsearch.xpack.esql.formatter.TextFormat.URL_PARAM_DELIMITER; @ServerlessScope(Scope.PUBLIC) @@ -50,20 +53,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } } - protected static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) { - final Boolean partialResults = request.paramAsBoolean("allow_partial_results", null); + private static RestChannelConsumer restChannelConsumer(EsqlQueryRequest esqlRequest, RestRequest request, NodeClient client) { + final Boolean partialResults = request.paramAsBoolean(ALLOW_PARTIAL_RESULTS_OPTION, null); + final boolean shouldStream = request.paramAsBoolean(STREAM_OPTION, false); if (partialResults != null) { esqlRequest.allowPartialResults(partialResults); } LOGGER.debug("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query()); - + // TODO: Create responseStream here, and add to the request object (?). See RestRepositoryVerifyIntegrityAction return channel -> { + final var responseStream = EsqlQueryResponseStream.forMediaType(channel, request, esqlRequest, shouldStream); + esqlRequest.responseStream(responseStream); RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel()); - cancellableClient.execute( - EsqlQueryAction.INSTANCE, - esqlRequest, - new EsqlResponseListener(channel, request, esqlRequest).wrapWithLogging() - ); + cancellableClient.execute(EsqlQueryAction.INSTANCE, esqlRequest, responseStream.completionListener()); }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/AbstractEsqlQueryResponseStream.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/AbstractEsqlQueryResponseStream.java new file mode 100644 index 0000000000000..d05090543307c --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/AbstractEsqlQueryResponseStream.java @@ -0,0 +1,233 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action.stream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; +import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * Base class for streamed {@link EsqlQueryResponse} responses. + * + * TODO: Should this be thread-safe? + * TODO: Add something to complete with an error on error. Is this BWC? + * TODO: Took header wouldn't be available on streaming + */ +abstract class AbstractEsqlQueryResponseStream implements EsqlQueryResponseStream { + private static final Logger LOGGER = LogManager.getLogger(AbstractEsqlQueryResponseStream.class); + + protected final RestChannel restChannel; + protected final RestRequest restRequest; + protected final EsqlQueryRequest esqlRequest; + + /** + * Flag to check if we sent the starting chunk of the response. + *

+ * Used to know if we should just send everything at once on {@link #finishResponse}. + *

+ */ + private boolean initialStreamChunkSent = false; + /** + * Flag to check we don't call {@link #finishResponse} more than once. + */ + private boolean finished = false; + + protected AbstractEsqlQueryResponseStream(RestChannel restChannel, RestRequest restRequest, EsqlQueryRequest esqlRequest) { + this.restChannel = restChannel; + this.restRequest = restRequest; + this.esqlRequest = esqlRequest; + } + + /** + * Starts the response stream. This is the first method to be called + */ + @Override + public final void startResponse(List schema) { + assert initialStreamChunkSent == false : "startResponse() called more than once"; + assert finished == false : "sendPages() called on a finished stream"; + + if (canBeStreamed() == false) { + return; + } + + // TODO: Copied from TransportEsqlQueryAction#toResponse. Deduplicate this code + List columns = schema.stream().map(c -> { + List originalTypes; + if (c instanceof UnsupportedAttribute ua) { + // Sort the original types so they are easier to test against and prettier. + originalTypes = new ArrayList<>(ua.originalTypes()); + Collections.sort(originalTypes); + } else { + originalTypes = null; + } + return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes); + }).toList(); + + sendChunks(doStartResponse(columns)); + + initialStreamChunkSent = true; + } + + @Override + public final void sendPages(Iterable pages) { + assert finished == false : "sendPages() called on a finished stream"; + + if (initialStreamChunkSent) { + sendChunks(doSendPages(pages)); + } + } + + @Override + public final void finishResponse(EsqlQueryResponse response) { + assert finished == false : "finishResponse() called more than once"; + + // TODO: Also, is this closing right? EsqlResponseListener uses releasableFromResponse(), which increments the ref first + response.mustIncRef(); + Releasable releasable = Releasables.assertOnce(response::decRef); + boolean success = false; + try { + if (initialStreamChunkSent) { + sendChunks(doFinishResponse(response), releasable); + } else { + sendChunks(doSendEverything(response), releasable); + } + success = true; + } finally { + if (success == false) { + releasable.close(); + } + finished = true; + } + } + + @Override + public final void handleException(Exception e) { + assert finished == false : "handleException() called on a finished stream"; + + // TODO: To be overridden by subclasses. This should append the error to the stream, if possible + LOGGER.error("Error while streaming response", e); + + sendChunks(doHandleException(e)); + + finished = true; + } + + // TODO: For error handling, check RestActionListener error listener + // TODO: Also ensure that we check if the channel is closed at some points (Also see RestActionListener) + + @Override + public ActionListener completionListener() { + return ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(EsqlQueryResponse esqlResponse) { + assert finished == false : "completionListener() called on a finished stream"; + + finishResponse(esqlResponse); + } + + @Override + public void onFailure(Exception e) { + assert finished == false : "onFailure() called on a finished stream"; + + handleException(e); + } + }, this); + } + + /** + * Returns true if the response can be streamed, false otherwise. + *

+ * Some parameters make the response not streamable, such as `columnar` or `drop_null_columns`, + * as the initial chunk can't be calculated until all pages are received. + *

+ */ + protected abstract boolean canBeStreamed(); + + /** + * Returns the chunks to be sent at the beginning of the response. Called once, at the start. + *

+ * Only called if {@link #canBeStreamed()} returns {@code true}. + *

+ */ + protected abstract Iterator doStartResponse(List columns); + + /** + * Returns the chunks for the given page. Called 0 to N times, after {@link #doStartResponse} and before {@link #doFinishResponse}. + *

+ * Only called if {@link #canBeStreamed()} returns {@code true}. + *

+ */ + protected abstract Iterator doSendPages(Iterable pages); + + /** + * Returns the remaining chunks of the response. Called once, at the end of the response. + *

+ * Only called if {@link #canBeStreamed()} returns {@code true}. + *

+ */ + protected abstract Iterator doFinishResponse(EsqlQueryResponse response); + + /** + * Returns the chunks to be sent for the given exception. + *

+ * This may be called at any time, so the code should track what was sent already + * and how to send a meaningful response given the chunks sent in previous calls. + *

+ */ + protected abstract Iterator doHandleException(Exception e); + + /** + * Returns the chunks of the full response. Called once for the full response. + *

+ * Only called if {@link #canBeStreamed()} returns {@code false}. + *

+ */ + protected Iterator doSendEverything(EsqlQueryResponse response) { + // TODO: Is this safe? Should this be abstract to ensure proper implementation? Add tests for both streamed and "everything" cases + return Iterators.concat(doStartResponse(response.columns()), doSendPages(response.pages()), doFinishResponse(response)); + } + + protected abstract void doSendChunks(Iterator chunks, Releasable releasable); + + protected void doClose() {} + + @SuppressWarnings("unchecked") + protected static Iterator asIterator(List> chunks) { + return Iterators.concat(chunks.toArray(Iterator[]::new)); + } + + private void sendChunks(Iterator chunks) { + sendChunks(chunks, () -> {}); + } + + protected void sendChunks(Iterator chunks, Releasable releasable) { + doSendChunks(chunks, releasable); + } + + @Override + public void close() { + doClose(); + finished = true; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/EsqlQueryResponseStream.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/EsqlQueryResponseStream.java new file mode 100644 index 0000000000000..9cd245867f226 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/EsqlQueryResponseStream.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action.stream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.xcontent.MediaType; +import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; +import org.elasticsearch.xpack.esql.arrow.ArrowFormat; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.formatter.TextFormat; +import org.elasticsearch.xpack.esql.plugin.EsqlMediaTypeParser; + +import java.io.IOException; +import java.util.List; + +/** + * Streamed {@link EsqlQueryResponse} response. + */ +public interface EsqlQueryResponseStream extends Releasable { + /** + * @param shouldStream false if streaming should be disabled + */ + static EsqlQueryResponseStream forMediaType( + RestChannel restChannel, + RestRequest restRequest, + EsqlQueryRequest esqlRequest, + boolean shouldStream + ) throws IOException { + if (shouldStream == false) { + // TODO: Make this override the canBeStreamed() instead? To avoid duplicating code and keeping the old classes + return new NonStreamingEsqlQueryResponseStream(restChannel, restRequest, esqlRequest); + } + + MediaType mediaType = EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest); + + if (mediaType instanceof TextFormat format) { + return new TextEsqlQueryResponseStream(restChannel, restRequest, esqlRequest, format); + } else if (mediaType == ArrowFormat.INSTANCE) { + // TODO: Add support + throw new UnsupportedOperationException("Arrow format is not yet supported for streaming"); + } + + return new XContentEsqlQueryResponseStream(restChannel, restRequest, esqlRequest); + } + + /** + * Starts the response stream. + *

+ * This is the first method to be called, with the initial data of the request, before beginning the computation + *

+ */ + void startResponse(List schema); + + /** + * Called after {@link #startResponse}, for each page of results. + */ + void sendPages(Iterable pages); + + /** + * Last method to be called, when the computation is finished. + */ + void finishResponse(EsqlQueryResponse response); + + /** + * Called when an exception is thrown at any point, and the request can't be completed. + */ + void handleException(Exception e); + + /** + * Returns a listener to be called when the response is completed. + *

+ * This listener takes care of calling both {@link #finishResponse} and {@link #handleException}. + *

+ */ + ActionListener completionListener(); +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/NonStreamingEsqlQueryResponseStream.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/NonStreamingEsqlQueryResponseStream.java new file mode 100644 index 0000000000000..d0f177cfb19df --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/NonStreamingEsqlQueryResponseStream.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action.stream; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; +import org.elasticsearch.xpack.esql.action.EsqlResponseListener; +import org.elasticsearch.xpack.esql.core.expression.Attribute; + +import java.util.List; + +/** + * Custom response stream that sends everything at once. + *

+ * Used for backwards compatibility until the other classes are fully backwards-compatible. + * That means returning the correct error on an exception, and the expected body. + *

+ *

+ * This class currently delegates its functionality to {@link EsqlResponseListener}. + *

+ */ +class NonStreamingEsqlQueryResponseStream implements EsqlQueryResponseStream { + + private final ActionListener listener; + + NonStreamingEsqlQueryResponseStream(RestChannel restChannel, RestRequest restRequest, EsqlQueryRequest esqlRequest) { + this.listener = new EsqlResponseListener(restChannel, restRequest, esqlRequest).wrapWithLogging(); + } + + @Override + public void startResponse(List schema) { + // No-op, all done in the listener + } + + @Override + public void sendPages(Iterable pages) { + // No-op, all done in the listener + } + + @Override + public void finishResponse(EsqlQueryResponse response) { + // No-op, all done in the listener + } + + @Override + public void handleException(Exception e) { + // No-op, all done in the listener + } + + @Override + public ActionListener completionListener() { + return listener; + } + + @Override + public void close() {} +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/TextEsqlQueryResponseStream.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/TextEsqlQueryResponseStream.java new file mode 100644 index 0000000000000..d13ceda390883 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/TextEsqlQueryResponseStream.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action.stream; + +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.StreamingResponse; +import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; +import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; +import org.elasticsearch.xpack.esql.action.ResponseValueUtils; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.formatter.TextFormat; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.DROP_NULL_COLUMNS_OPTION; + +/** + * Default, XContent response stream. + */ +class TextEsqlQueryResponseStream extends AbstractEsqlQueryResponseStream> { + + // TODO: Maybe create this on startResponse()? Does creating this do something with the response? Can we still safely set headers? + private final StreamingResponse> streamingResponse; + + /** + * Stored on {@link #doStartResponse}, and used later when sending pages. + */ + @Nullable + private List columns; + /** + * Stored on {@link #doStartResponse}, and used later when sending pages. + */ + @Nullable + private List dataTypes; + + private final TextFormat format; + private final boolean dropNullColumns; + + private RecyclerBytesStreamOutput currentOutput = null; + /** + * Writer that uses currentOutput as a sink. + *

+ * Used to avoid creating new writers for each chunk to write. + *

+ *

+ * Similar to what ChunkedRestResponseBodyPart.fromTextChunks() does + *

+ */ + private final Writer writer = new OutputStreamWriter(new OutputStream() { + @Override + public void write(int b) throws IOException { + assert currentOutput != null; + currentOutput.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + assert currentOutput != null; + currentOutput.write(b, off, len); + } + + @Override + public void flush() { + assert currentOutput != null; + currentOutput.flush(); + } + + @Override + public void close() { + assert currentOutput != null; + currentOutput.flush(); + } + }, StandardCharsets.UTF_8); + + TextEsqlQueryResponseStream(RestChannel restChannel, RestRequest restRequest, EsqlQueryRequest esqlRequest, TextFormat format) { + super(restChannel, restRequest, esqlRequest); + + this.format = format; + this.dropNullColumns = restRequest.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false); + this.streamingResponse = new StreamingResponse<>(restChannel, format.contentType(restRequest), () -> {}, (chunk, streamOutput) -> { + currentOutput = streamOutput; + chunk.accept(writer); + writer.flush(); + currentOutput = null; + }); + } + + @Override + protected boolean canBeStreamed() { + return dropNullColumns == false; + } + + @Override + protected Iterator> doStartResponse(List columns) { + this.columns = columns; + this.dataTypes = columns.stream().map(ColumnInfoImpl::type).toList(); + return format.formatHeader(restRequest, columns, new boolean[columns.size()]); + } + + @Override + protected Iterator> doSendPages(Iterable pages) { + return format.formatRows( + restRequest, + columns, + () -> ResponseValueUtils.pagesToValues(dataTypes, pages), + new boolean[dataTypes.size()] + ); + } + + @Override + protected Iterator> doFinishResponse(EsqlQueryResponse response) { + // Nothing to do here + return Collections.emptyIterator(); + } + + @Override + protected Iterator> doSendEverything(EsqlQueryResponse response) { + return format.format(restRequest, response); + } + + @Override + protected Iterator> doHandleException(Exception e) { + // TODO: Implement + return Collections.emptyIterator(); + } + + @Override + protected void doSendChunks(Iterator> chunks, Releasable releasable) { + if (chunks.hasNext()) { + streamingResponse.writeFragment(chunks, releasable); + } + } + + @Override + protected void doClose() { + streamingResponse.close(); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/XContentEsqlQueryResponseStream.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/XContentEsqlQueryResponseStream.java new file mode 100644 index 0000000000000..10d150b78562a --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/stream/XContentEsqlQueryResponseStream.java @@ -0,0 +1,199 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action.stream; + +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.StreamingXContentResponse; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; +import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; +import org.elasticsearch.xpack.esql.action.ResponseXContentUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.DROP_NULL_COLUMNS_OPTION; + +/** + * XContent response stream. + */ +class XContentEsqlQueryResponseStream extends AbstractEsqlQueryResponseStream { + + // TODO: Maybe create this on startResponse()? Does creating this do something with the response? Can we still safely set headers? + private final StreamingXContentResponse streamingXContentResponse; + + /** + * Stored on {@link #doStartResponse}, and used later when sending pages. + */ + @Nullable + private List columns; + + private final boolean dropNullColumns; + + XContentEsqlQueryResponseStream(RestChannel restChannel, RestRequest restRequest, EsqlQueryRequest esqlRequest) throws IOException { + super(restChannel, restRequest, esqlRequest); + + this.dropNullColumns = restRequest.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false); + this.streamingXContentResponse = new StreamingXContentResponse(restChannel, restChannel.request(), () -> {}); + } + + @Override + protected boolean canBeStreamed() { + return dropNullColumns == false && esqlRequest.columnar() == false; + } + + @Override + protected Iterator doStartResponse(List columns) { + assert dropNullColumns == false : "this method doesn't support dropping null columns"; + + this.columns = columns; + + var content = new ArrayList>(3); + + content.add(ChunkedToXContentHelper.startObject()); + content.add(ResponseXContentUtils.allColumns(columns, "columns")); + + // Start the values array, to be filled in + content.add(ChunkedToXContentHelper.startArray("values")); + + return asIterator(content); + } + + @Override + protected Iterator doSendPages(Iterable pages) { + assert columns != null : "columns must be set before sending pages"; + + return ResponseXContentUtils.rowValues(columns, pages, null); + } + + @Override + protected Iterator doFinishResponse(EsqlQueryResponse response) { + var content = new ArrayList>(10); + + // End the values array + content.add(ChunkedToXContentHelper.endArray()); + + var executionInfo = response.getExecutionInfo(); + if (executionInfo != null) { + if (executionInfo.overallTook() != null) { + content.add( + ChunkedToXContentHelper.chunk( + (builder, p) -> builder // + .field("took", executionInfo.overallTook().millis()) + .field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial()) + ) + ); + } + if (executionInfo.hasMetadataToReport()) { + content.add(ChunkedToXContentHelper.field("_clusters", executionInfo, restRequest)); + } + } + content.add( + ChunkedToXContentHelper.chunk( + (builder, p) -> builder // + .field("documents_found", response.documentsFound()) + .field("values_loaded", response.valuesLoaded()) + ) + ); + + var profile = response.profile(); + if (profile != null) { + content.add(ChunkedToXContentHelper.startObject("profile")); + content.add(ChunkedToXContentHelper.chunk((b, p) -> { + if (executionInfo != null) { + b.field("query", executionInfo.overallTimeSpan()); + b.field("planning", executionInfo.planningTimeSpan()); + } + return b; + })); + content.add(ChunkedToXContentHelper.array("drivers", profile.drivers().iterator(), restRequest)); + content.add(ChunkedToXContentHelper.array("plans", profile.plans().iterator())); + content.add(ChunkedToXContentHelper.endObject()); + } + + content.add(ChunkedToXContentHelper.endObject()); + + return asIterator(content); + } + + @Override + protected Iterator doSendEverything(EsqlQueryResponse response) { + // final Releasable releasable = releasableFromResponse(esqlResponse); + + // TODO: Instead of sendChunks, implement a flush() to attach the response to it? Or pass the response to the methods + // TODO: Or make "doX" methods return an Iterator and then concat them all together + + var content = new ArrayList>(3); + boolean[] nullColumns = null; + if (dropNullColumns) { + nullColumns = nullColumns(response.columns(), response.pages()); + content.add(sendStartResponseDroppingNullColumns(response.columns(), nullColumns)); + } else { + content.add(doStartResponse(response.columns())); + } + // doSendPages doesn't work with nullColumns or columnar, so we generate them here directly + content.add(ResponseXContentUtils.columnValues(response.columns(), response.pages(), esqlRequest.columnar(), nullColumns)); + content.add(doFinishResponse(response)); + return asIterator(content); + } + + @Override + protected Iterator doHandleException(Exception e) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + private Iterator sendStartResponseDroppingNullColumns(List columns, boolean[] nullColumns) { + assert dropNullColumns : "this method should only be called when dropping null columns"; + + var content = new ArrayList>(3); + + content.add(ChunkedToXContentHelper.startObject()); + content.add(ResponseXContentUtils.allColumns(columns, "all_columns")); + content.add(ResponseXContentUtils.nonNullColumns(columns, nullColumns, "columns")); + + return asIterator(content); + } + + private boolean[] nullColumns(List columns, List pages) { + boolean[] nullColumns = new boolean[columns.size()]; + for (int c = 0; c < nullColumns.length; c++) { + nullColumns[c] = allColumnsAreNull(pages, c); + } + return nullColumns; + } + + private boolean allColumnsAreNull(List pages, int c) { + for (Page page : pages) { + if (page.getBlock(c).areAllValuesNull() == false) { + return false; + } + } + return true; + } + + @Override + protected void doSendChunks(Iterator chunks, Releasable releasable) { + if (chunks.hasNext()) { + streamingXContentResponse.writeFragment(p0 -> chunks, releasable); + } + } + + @Override + protected void doClose() { + streamingXContentResponse.close(); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormat.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormat.java index 7a7e4677b0dca..09b4ae1334aa9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormat.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormat.java @@ -12,6 +12,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.xcontent.MediaType; import org.elasticsearch.xpack.core.esql.action.ColumnInfo; +import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.core.util.StringUtils; @@ -27,6 +28,9 @@ import java.util.Objects; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; + +import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.DROP_NULL_COLUMNS_OPTION; /** * Templating class for displaying ESQL responses in text formats. @@ -40,7 +44,35 @@ public enum TextFormat implements MediaType { @Override public Iterator> format(RestRequest request, EsqlQueryResponse esqlResponse) { boolean dropNullColumns = request.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false); - return new TextFormatter(esqlResponse, hasHeader(request), dropNullColumns).format(); + return new TextFormatter( + esqlResponse.columns(), + esqlResponse::values, + hasHeader(request), + dropNullColumns ? esqlResponse.nullColumns() : new boolean[esqlResponse.columns().size()] + ).format(); + } + + @Override + public Iterator> formatHeader( + RestRequest request, + List columns, + boolean[] dropColumns + ) { + if (hasHeader(request) == false) { + return Collections.emptyIterator(); + } + + return new TextFormatter(columns, Collections::emptyIterator, true, dropColumns).format(); + } + + @Override + public Iterator> formatRows( + RestRequest request, + List columns, + Supplier>> values, + boolean[] dropColumns + ) { + return new TextFormatter(columns, values, false, dropColumns).format(); } @Override @@ -283,21 +315,38 @@ public Set headerValues() { */ public static final String URL_PARAM_FORMAT = "format"; public static final String URL_PARAM_DELIMITER = "delimiter"; - public static final String DROP_NULL_COLUMNS_OPTION = "drop_null_columns"; public Iterator> format(RestRequest request, EsqlQueryResponse esqlResponse) { - final var delimiter = delimiter(request); boolean dropNullColumns = request.paramAsBoolean(DROP_NULL_COLUMNS_OPTION, false); boolean[] dropColumns = dropNullColumns ? esqlResponse.nullColumns() : new boolean[esqlResponse.columns().size()]; return Iterators.concat( // if the header is requested return the info - hasHeader(request) && esqlResponse.columns() != null - ? Iterators.single(writer -> row(writer, esqlResponse.columns().iterator(), ColumnInfo::name, delimiter, dropColumns)) - : Collections.emptyIterator(), - Iterators.map( - esqlResponse.values(), - row -> writer -> row(writer, row, f -> Objects.toString(f, StringUtils.EMPTY), delimiter, dropColumns) - ) + formatHeader(request, esqlResponse.columns(), dropColumns), + formatRows(request, esqlResponse.columns(), esqlResponse::values, dropColumns) + ); + } + + public Iterator> formatHeader( + RestRequest request, + List columns, + boolean[] dropColumns + ) { + final var delimiter = delimiter(request); + return hasHeader(request) && columns != null + ? Iterators.single(writer -> row(writer, columns.iterator(), ColumnInfo::name, delimiter, dropColumns)) + : Collections.emptyIterator(); + } + + public Iterator> formatRows( + RestRequest request, + List columns, + Supplier>> values, + boolean[] dropColumns + ) { + final var delimiter = delimiter(request); + return Iterators.map( + values.get(), + row -> writer -> row(writer, row, f -> Objects.toString(f, StringUtils.EMPTY), delimiter, dropColumns) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormatter.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormatter.java index 95b46958be351..6cddee11b13b2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormatter.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormatter.java @@ -9,14 +9,17 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import java.io.IOException; import java.io.Writer; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Objects; import java.util.function.Function; +import java.util.function.Supplier; /** * Formats {@link EsqlQueryResponse} for the textual representation. @@ -27,7 +30,8 @@ public class TextFormatter { */ private static final int MIN_COLUMN_WIDTH = 15; - private final EsqlQueryResponse response; + private final List columns; + private final Supplier>> valuesSupplier; private final int[] width; private final Function FORMATTER = Objects::toString; private final boolean includeHeader; @@ -36,11 +40,16 @@ public class TextFormatter { /** * Create a new {@linkplain TextFormatter} for formatting responses */ - public TextFormatter(EsqlQueryResponse response, boolean includeHeader, boolean dropNullColumns) { - this.response = response; - var columns = response.columns(); + public TextFormatter( + List columns, + Supplier>> valuesSupplier, + boolean includeHeader, + boolean[] dropColumns + ) { + this.columns = columns; + this.valuesSupplier = valuesSupplier; this.includeHeader = includeHeader; - this.dropColumns = dropNullColumns ? response.nullColumns() : new boolean[columns.size()]; + this.dropColumns = dropColumns; // Figure out the column widths: // 1. Start with the widths of the column names width = new int[columns.size()]; @@ -50,7 +59,7 @@ public TextFormatter(EsqlQueryResponse response, boolean includeHeader, boolean } // 2. Expand columns to fit the largest value - var iterator = response.values(); + var iterator = valuesSupplier.get(); while (iterator.hasNext()) { var row = iterator.next(); for (int i = 0; i < width.length; i++) { @@ -67,7 +76,7 @@ public TextFormatter(EsqlQueryResponse response, boolean includeHeader, boolean public Iterator> format() { return Iterators.concat( // The header lines - includeHeader && response.columns().isEmpty() == false ? Iterators.single(this::formatHeader) : Collections.emptyIterator(), + includeHeader && columns.isEmpty() == false ? Iterators.single(this::formatHeader) : Collections.emptyIterator(), // Now format the results. formatResults() ); @@ -82,7 +91,7 @@ private void formatHeader(Writer writer) throws IOException { writer.append('|'); } - String name = response.columns().get(i).name(); + String name = columns.get(i).name(); // left padding int leftPadding = (width[i] - name.length()) / 2; writePadding(leftPadding, writer); @@ -105,7 +114,7 @@ private void formatHeader(Writer writer) throws IOException { } private Iterator> formatResults() { - return Iterators.map(response.values(), row -> writer -> { + return Iterators.map(valuesSupplier.get(), row -> writer -> { for (int i = 0; i < width.length; i++) { assert row.hasNext(); if (dropColumns[i]) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index d72fe46169809..fd7d0bf7c006c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -26,6 +26,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -49,6 +50,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; +import org.elasticsearch.xpack.esql.action.stream.EsqlQueryResponseStream; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; @@ -190,6 +192,7 @@ public void execute( Configuration configuration, FoldContext foldContext, EsqlExecutionInfo execInfo, + EsqlQueryResponseStream responseStream, ActionListener listener ) { assert ThreadPool.assertCurrentThreadPool( @@ -206,12 +209,37 @@ public void execute( // we have no sub plans, so we can just execute the given plan if (subplans == null || subplans.isEmpty()) { - executePlan(sessionId, rootTask, flags, physicalPlan, configuration, foldContext, execInfo, null, listener, null); + if (responseStream != null) { + responseStream.startResponse(physicalPlan.output()); + } + + executePlan( + sessionId, + rootTask, + flags, + physicalPlan, + configuration, + foldContext, + execInfo, + null, + listener, + responseStream, + null + ); return; } final List collectedPages = Collections.synchronizedList(new ArrayList<>()); - PhysicalPlan mainPlan = new OutputExec(subplansAndMainPlan.v2(), collectedPages::add); + PhysicalPlan mainPlan = new OutputExec(subplansAndMainPlan.v2(), page -> { + collectedPages.add(page); + if (responseStream != null) { + responseStream.sendPages(List.of(page)); + } + }); + + if (responseStream != null) { + responseStream.startResponse(mainPlan.output()); + } listener = listener.delegateResponse((l, e) -> { collectedPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks)); @@ -281,6 +309,7 @@ public void execute( exchangeService.finishSinkHandler(childSessionId, e); subPlanListener.onFailure(e); }), + null, () -> exchangeSink.createExchangeSink(() -> {}) ); } @@ -296,9 +325,10 @@ public void executePlan( Configuration configuration, FoldContext foldContext, EsqlExecutionInfo execInfo, - String profileQualifier, + @Nullable String profileQualifier, ActionListener listener, - Supplier exchangeSinkSupplier + @Nullable EsqlQueryResponseStream responseStream, + @Nullable Supplier exchangeSinkSupplier ) { Tuple coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode( physicalPlan, @@ -312,7 +342,12 @@ public void executePlan( PhysicalPlan coordinatorPlan = coordinatorAndDataNodePlan.v1(); if (exchangeSinkSupplier == null) { - coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), collectedPages::add); + coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), page -> { + collectedPages.add(page); + if (responseStream != null) { + responseStream.sendPages(List.of(page)); + } + }); } PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index a23154c218a61..10e4d91a277e4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -250,6 +250,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener