1313import org .apache .lucene .search .TotalHits ;
1414import org .elasticsearch .TransportVersion ;
1515import org .elasticsearch .TransportVersions ;
16+ import org .elasticsearch .action .ActionListener ;
17+ import org .elasticsearch .action .support .SubscribableListener ;
1618import org .elasticsearch .common .io .stream .DelayableWriteable ;
1719import org .elasticsearch .common .io .stream .StreamInput ;
1820import org .elasticsearch .common .io .stream .StreamOutput ;
1921import org .elasticsearch .common .lucene .search .TopDocsAndMaxScore ;
2022import org .elasticsearch .core .Nullable ;
2123import org .elasticsearch .core .RefCounted ;
22- import org .elasticsearch .core .Releasable ;
2324import org .elasticsearch .core .Releasables ;
2425import org .elasticsearch .core .SimpleRefCounted ;
2526import org .elasticsearch .search .DocValueFormat ;
2829import org .elasticsearch .search .SearchShardTarget ;
2930import org .elasticsearch .search .aggregations .InternalAggregation ;
3031import org .elasticsearch .search .aggregations .InternalAggregations ;
32+ import org .elasticsearch .search .aggregations .support .AggregationContext ;
3133import org .elasticsearch .search .internal .ShardSearchContextId ;
3234import org .elasticsearch .search .internal .ShardSearchRequest ;
3335import org .elasticsearch .search .profile .SearchProfileDfsPhaseResult ;
3739import org .elasticsearch .transport .LeakTracker ;
3840
3941import java .io .IOException ;
40- import java .util .ArrayList ;
41- import java .util .List ;
4242
4343import static org .elasticsearch .common .lucene .Lucene .readTopDocs ;
4444import static org .elasticsearch .common .lucene .Lucene .writeTopDocs ;
@@ -75,7 +75,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
7575
7676 private final RefCounted refCounted ;
7777
78- private final List < Releasable > toRelease ;
78+ private final SubscribableListener < Void > aggsContextReleased ;
7979
8080 public QuerySearchResult () {
8181 this (false );
@@ -99,22 +99,22 @@ public QuerySearchResult(StreamInput in, boolean delayedAggregations) throws IOE
9999 readFromWithId (id , in , delayedAggregations );
100100 }
101101 refCounted = null ;
102- toRelease = null ;
102+ aggsContextReleased = null ;
103103 }
104104
105105 public QuerySearchResult (ShardSearchContextId contextId , SearchShardTarget shardTarget , ShardSearchRequest shardSearchRequest ) {
106106 this .contextId = contextId ;
107107 setSearchShardTarget (shardTarget );
108108 isNull = false ;
109109 setShardSearchRequest (shardSearchRequest );
110- this .toRelease = new ArrayList <>();
111110 this .refCounted = LeakTracker .wrap (new SimpleRefCounted ());
111+ this .aggsContextReleased = new SubscribableListener <>();
112112 }
113113
114114 private QuerySearchResult (boolean isNull ) {
115115 this .isNull = isNull ;
116116 this .refCounted = null ;
117- toRelease = null ;
117+ aggsContextReleased = null ;
118118 }
119119
120120 /**
@@ -275,16 +275,24 @@ public void releaseAggs() {
275275 aggregations .close ();
276276 aggregations = null ;
277277 }
278+ releaseAggsContext ();
278279 }
279280
280- public void addReleasable ( Releasable releasable ) {
281- toRelease . add ( releasable );
281+ public void addAggregationContext ( AggregationContext aggsContext ) {
282+ aggsContextReleased . addListener ( ActionListener . releasing ( aggsContext ) );
282283 }
283284
284285 public void aggregations (InternalAggregations aggregations ) {
285286 assert this .aggregations == null : "aggregations already set to [" + this .aggregations + "]" ;
286287 this .aggregations = aggregations == null ? null : DelayableWriteable .referencing (aggregations );
287288 hasAggs = aggregations != null ;
289+ releaseAggsContext ();
290+ }
291+
292+ private void releaseAggsContext () {
293+ if (aggsContextReleased != null ) {
294+ aggsContextReleased .onResponse (null );
295+ }
288296 }
289297
290298 @ Nullable
@@ -547,7 +555,7 @@ public boolean tryIncRef() {
547555 public boolean decRef () {
548556 if (refCounted != null ) {
549557 if (refCounted .decRef ()) {
550- Releasables . close ( toRelease );
558+ aggsContextReleased . onResponse ( null );
551559 return true ;
552560 }
553561 return false ;
0 commit comments