diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 787dc14f6cd96..9cc60e0f525cb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -92,7 +93,15 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO public SearchResponse(StreamInput in) throws IOException { super(in); this.hits = SearchHits.readFrom(in, true); - this.aggregations = in.readBoolean() ? InternalAggregations.readFrom(in) : null; + if (in.readBoolean()) { + // deserialize the aggregations trying to deduplicate the object created + // TODO: use DelayableWriteable instead. + this.aggregations = InternalAggregations.readFrom( + DelayableWriteable.wrapWithDeduplicatorStreamInput(in, in.getTransportVersion(), in.namedWriteableRegistry()) + ); + } else { + this.aggregations = null; + } this.suggest = in.readBoolean() ? new Suggest(in) : null; this.timedOut = in.readBoolean(); this.terminatedEarly = in.readOptionalBoolean(); diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java index 745fe034a804a..e1161f03cb8a7 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -12,6 +12,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import java.io.IOException; @@ -231,16 +232,24 @@ private static T deserialize( NamedWriteableRegistry registry, BytesReference serialized ) throws IOException { - try ( - StreamInput in = registry == null - ? new DeduplicateStreamInput(serialized.streamInput(), new DeduplicatorCache()) - : new DeduplicateNamedWriteableAwareStreamInput(serialized.streamInput(), registry, new DeduplicatorCache()) - ) { - in.setTransportVersion(serializedAtVersion); - return reader.read(in); + try (StreamInput in = serialized.streamInput()) { + return reader.read(wrapWithDeduplicatorStreamInput(in, serializedAtVersion, registry)); } } + /** Wraps the provided {@link StreamInput} with another stream that extends {@link Deduplicator} */ + public static StreamInput wrapWithDeduplicatorStreamInput( + StreamInput in, + TransportVersion serializedAtVersion, + @Nullable NamedWriteableRegistry registry + ) { + StreamInput out = registry == null + ? new DeduplicateStreamInput(in, new DeduplicatorCache()) + : new DeduplicateNamedWriteableAwareStreamInput(in, registry, new DeduplicatorCache()); + out.setTransportVersion(serializedAtVersion); + return out; + } + /** An object implementing this interface can deduplicate instance of the provided objects.*/ public interface Deduplicator { T deduplicate(T object);