diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index e81d659efe84f..3ebf860f54500 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -302,8 +302,7 @@ private static InternalAggregations aggregate( int resultSetSize, AggregationReduceContext reduceContext ) { - interface ReleasableIterator extends Iterator, Releasable {} - try (var aggsIter = new ReleasableIterator() { + try (var aggsIter = new ReleasableIterator() { private Releasable toRelease; diff --git a/server/src/main/java/org/elasticsearch/action/search/ReleasableIterator.java b/server/src/main/java/org/elasticsearch/action/search/ReleasableIterator.java new file mode 100644 index 0000000000000..9f623ea8f9c83 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/ReleasableIterator.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.core.Releasable; + +import java.util.Iterator; + +/** + * Iterator that must be released after the last element consumed from it is not needed any longer. + */ +public interface ReleasableIterator extends Iterator, Releasable {} diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 1f676f29e446e..df2c452661e32 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -14,7 +14,6 @@ import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,6 +24,8 @@ import org.elasticsearch.common.xcontent.ChunkedToXContentObject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.RestStatus; @@ -42,6 +43,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayDeque; import java.util.Base64; import java.util.Collections; import java.util.Iterator; @@ -388,7 +390,7 @@ public Clusters getClusters() { } @Override - public Iterator toXContentChunked(ToXContent.Params params) { + public ReleasableIterator toXContentChunked(ToXContent.Params params) { assert hasReferences(); return getToXContentIterator(true, params); } @@ -397,17 +399,68 @@ public Iterator innerToXContentChunked(ToXContent.Params p return getToXContentIterator(false, params); } - private Iterator getToXContentIterator(boolean wrapInObject, ToXContent.Params params) { - return Iterators.concat( - wrapInObject ? ChunkedToXContentHelper.startObject() : Collections.emptyIterator(), - ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent), - Iterators.single(clusters), - hits.toXContentChunked(params), - aggregations == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(aggregations), - suggest == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(suggest), - profileResults == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(profileResults), - wrapInObject ? ChunkedToXContentHelper.endObject() : Collections.emptyIterator() - ); + private ReleasableIterator getToXContentIterator(boolean wrapInObject, ToXContent.Params params) { + final ArrayDeque> iters = new ArrayDeque<>(7); + if (wrapInObject) { + iters.addLast(ChunkedToXContentHelper.startObject()); + } + iters.addLast(ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent)); + iters.addLast(ChunkedToXContentHelper.chunk(clusters)); + var hits = this.hits; + hits.incRef(); + iters.addLast(hits.toXContentChunked(params)); + final Releasable releaseHits = Releasables.releaseOnce(hits::decRef); + iters.addLast(ChunkedToXContentHelper.chunk((b, p) -> { + releaseHits.close(); + return b; + })); + var aggregations = this.aggregations; + if (aggregations != null) { + iters.addLast(ChunkedToXContentHelper.chunk(aggregations)); + } + var suggest = this.suggest; + if (suggest != null) { + iters.addLast(ChunkedToXContentHelper.chunk(suggest)); + } + var profileResults = this.profileResults; + if (profileResults != null) { + iters.addLast(ChunkedToXContentHelper.chunk(profileResults)); + } + if (wrapInObject) { + iters.addLast(ChunkedToXContentHelper.endObject()); + } + return new ReleasableIterator<>() { + + @Override + public void close() { + releaseHits.close(); + } + + Iterator current = iters.pollFirst(); + + @Override + public boolean hasNext() { + while (true) { + if (current.hasNext()) { + return true; + } + var c = current = iters.pollFirst(); + if (c == null) { + current = Collections.emptyIterator(); + return false; + } + } + } + + @Override + public ToXContent next() { + while (current.hasNext() == false) { + current = iters.pollFirst(); + } + var res = current.next(); + return res; + } + }; } public XContentBuilder headerToXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java b/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java index d7ebf33c0a1ab..dc6b6b5d28081 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java +++ b/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java @@ -25,7 +25,7 @@ */ public class RestChunkedToXContentListener extends RestActionListener { - private final ToXContent.Params params; + protected final ToXContent.Params params; public RestChunkedToXContentListener(RestChannel channel) { this(channel, channel.request()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 5eda47bc32354..b682dd3cd941c 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -11,17 +11,22 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.search.ReleasableIterator; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestActions; @@ -36,6 +41,7 @@ import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.term.TermSuggestionBuilder; import org.elasticsearch.usage.SearchUsageHolder; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParser; import java.io.IOException; @@ -120,7 +126,24 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC return channel -> { RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); - cancelClient.execute(TransportSearchAction.TYPE, searchRequest, new RestRefCountedChunkedToXContentListener<>(channel)); + cancelClient.execute(TransportSearchAction.TYPE, searchRequest, new RestRefCountedChunkedToXContentListener<>(channel) { + @Override + protected Releasable releasableFromResponse(SearchResponse searchResponse) { + return super.releasableFromResponse(searchResponse); + } + + @Override + protected void processResponse(SearchResponse searchResponse) throws IOException { + final ReleasableIterator iter = searchResponse.toXContentChunked(params); + channel.sendResponse( + RestResponse.chunked( + getRestStatus(searchResponse), + ChunkedRestResponseBodyPart.fromXContent(p -> iter, params, channel), + iter + ) + ); + } + }); }; }