diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index a6c15fc6b1a57..6a643baadd165 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexService; @@ -51,6 +50,7 @@ import org.elasticsearch.index.search.NestedHelper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.search.aggregations.SearchContextAggregations; +import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -861,8 +861,8 @@ public QuerySearchResult queryResult() { return queryResult; } - public void addQuerySearchResultReleasable(Releasable releasable) { - queryResult.addReleasable(releasable); + public void addAggregationContext(AggregationContext aggregationContext) { + queryResult.addAggregationContext(aggregationContext); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 9b1f0b3f2dd0b..79bfb4056df5a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1613,7 +1613,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc enableRewriteAggsToFilterByFilter, source.aggregations().isInSortOrderExecutionRequired() ); - context.addQuerySearchResultReleasable(aggContext); + context.addAggregationContext(aggContext); try { final AggregatorFactories factories = source.aggregations().build(aggContext, null); context.aggregations( diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index a66bc7aaa092a..69ed8e72a6510 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -13,13 +13,14 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.SimpleRefCounted; import org.elasticsearch.search.DocValueFormat; @@ -28,6 +29,7 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.profile.SearchProfileDfsPhaseResult; @@ -37,8 +39,6 @@ import org.elasticsearch.transport.LeakTracker; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; @@ -75,7 +75,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private final RefCounted refCounted; - private final List toRelease; + private final SubscribableListener aggsContextReleased; public QuerySearchResult() { this(false); @@ -99,7 +99,7 @@ public QuerySearchResult(StreamInput in, boolean delayedAggregations) throws IOE readFromWithId(id, in, delayedAggregations); } refCounted = null; - toRelease = null; + aggsContextReleased = null; } public QuerySearchResult(ShardSearchContextId contextId, SearchShardTarget shardTarget, ShardSearchRequest shardSearchRequest) { @@ -107,14 +107,14 @@ public QuerySearchResult(ShardSearchContextId contextId, SearchShardTarget shard setSearchShardTarget(shardTarget); isNull = false; setShardSearchRequest(shardSearchRequest); - this.toRelease = new ArrayList<>(); this.refCounted = LeakTracker.wrap(new SimpleRefCounted()); + this.aggsContextReleased = new SubscribableListener<>(); } private QuerySearchResult(boolean isNull) { this.isNull = isNull; this.refCounted = null; - toRelease = null; + aggsContextReleased = null; } /** @@ -275,16 +275,24 @@ public void releaseAggs() { aggregations.close(); aggregations = null; } + releaseAggsContext(); } - public void addReleasable(Releasable releasable) { - toRelease.add(releasable); + public void addAggregationContext(AggregationContext aggsContext) { + aggsContextReleased.addListener(ActionListener.releasing(aggsContext)); } public void aggregations(InternalAggregations aggregations) { assert this.aggregations == null : "aggregations already set to [" + this.aggregations + "]"; this.aggregations = aggregations == null ? null : DelayableWriteable.referencing(aggregations); hasAggs = aggregations != null; + releaseAggsContext(); + } + + private void releaseAggsContext() { + if (aggsContextReleased != null) { + aggsContextReleased.onResponse(null); + } } @Nullable @@ -547,7 +555,7 @@ public boolean tryIncRef() { public boolean decRef() { if (refCounted != null) { if (refCounted.decRef()) { - Releasables.close(toRelease); + aggsContextReleased.onResponse(null); return true; } return false;