Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ private static InternalAggregations aggregate(
int resultSetSize,
AggregationReduceContext reduceContext
) {
interface ReleasableIterator extends Iterator<InternalAggregations>, Releasable {}
try (var aggsIter = new ReleasableIterator() {
try (var aggsIter = new ReleasableIterator<InternalAggregations>() {

private Releasable toRelease;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.search;

import org.elasticsearch.core.Releasable;

import java.util.Iterator;

/**
* Iterator that must be released after the last element consumed from it is not needed any longer.
*/
public interface ReleasableIterator<T> extends Iterator<T>, Releasable {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -25,6 +24,8 @@
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.SimpleRefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -42,6 +43,7 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Base64;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -388,7 +390,7 @@ public Clusters getClusters() {
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
public ReleasableIterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
assert hasReferences();
return getToXContentIterator(true, params);
}
Expand All @@ -397,17 +399,68 @@ public Iterator<? extends ToXContent> innerToXContentChunked(ToXContent.Params p
return getToXContentIterator(false, params);
}

private Iterator<ToXContent> getToXContentIterator(boolean wrapInObject, ToXContent.Params params) {
return Iterators.concat(
wrapInObject ? ChunkedToXContentHelper.startObject() : Collections.emptyIterator(),
ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent),
Iterators.single(clusters),
hits.toXContentChunked(params),
aggregations == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(aggregations),
suggest == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(suggest),
profileResults == null ? Collections.emptyIterator() : ChunkedToXContentHelper.chunk(profileResults),
wrapInObject ? ChunkedToXContentHelper.endObject() : Collections.emptyIterator()
);
private ReleasableIterator<ToXContent> getToXContentIterator(boolean wrapInObject, ToXContent.Params params) {
final ArrayDeque<Iterator<? extends ToXContent>> iters = new ArrayDeque<>(7);
if (wrapInObject) {
iters.addLast(ChunkedToXContentHelper.startObject());
}
iters.addLast(ChunkedToXContentHelper.chunk(SearchResponse.this::headerToXContent));
iters.addLast(ChunkedToXContentHelper.chunk(clusters));
var hits = this.hits;
hits.incRef();
iters.addLast(hits.toXContentChunked(params));
final Releasable releaseHits = Releasables.releaseOnce(hits::decRef);
iters.addLast(ChunkedToXContentHelper.chunk((b, p) -> {
releaseHits.close();
return b;
}));
var aggregations = this.aggregations;
if (aggregations != null) {
iters.addLast(ChunkedToXContentHelper.chunk(aggregations));
}
var suggest = this.suggest;
if (suggest != null) {
iters.addLast(ChunkedToXContentHelper.chunk(suggest));
}
var profileResults = this.profileResults;
if (profileResults != null) {
iters.addLast(ChunkedToXContentHelper.chunk(profileResults));
}
if (wrapInObject) {
iters.addLast(ChunkedToXContentHelper.endObject());
}
return new ReleasableIterator<>() {

@Override
public void close() {
releaseHits.close();
}

Iterator<? extends ToXContent> current = iters.pollFirst();

@Override
public boolean hasNext() {
while (true) {
if (current.hasNext()) {
return true;
}
var c = current = iters.pollFirst();
if (c == null) {
current = Collections.emptyIterator();
return false;
}
}
}

@Override
public ToXContent next() {
while (current.hasNext() == false) {
current = iters.pollFirst();
}
var res = current.next();
return res;
}
};
}

public XContentBuilder headerToXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public class RestChunkedToXContentListener<Response extends ChunkedToXContent> extends RestActionListener<Response> {

private final ToXContent.Params params;
protected final ToXContent.Params params;

public RestChunkedToXContentListener(RestChannel channel) {
this(channel, channel.request());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.ReleasableIterator;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestActions;
Expand All @@ -36,6 +41,7 @@
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
import org.elasticsearch.usage.SearchUsageHolder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
Expand Down Expand Up @@ -120,7 +126,24 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

return channel -> {
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancelClient.execute(TransportSearchAction.TYPE, searchRequest, new RestRefCountedChunkedToXContentListener<>(channel));
cancelClient.execute(TransportSearchAction.TYPE, searchRequest, new RestRefCountedChunkedToXContentListener<>(channel) {
@Override
protected Releasable releasableFromResponse(SearchResponse searchResponse) {
return super.releasableFromResponse(searchResponse);
}

@Override
protected void processResponse(SearchResponse searchResponse) throws IOException {
final ReleasableIterator<? extends ToXContent> iter = searchResponse.toXContentChunked(params);
channel.sendResponse(
RestResponse.chunked(
getRestStatus(searchResponse),
ChunkedRestResponseBodyPart.fromXContent(p -> iter, params, channel),
iter
)
);
}
});
};
}

Expand Down