Skip to content

Commit d8686a3

Browse files
authored
Deduplicate created objects when deserializing InternalAggregations in SearchResponse (#124296) (#124462)
It should help in case of cross cluster search.
1 parent 2f4167c commit d8686a3

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.common.bytes.BytesReference;
1717
import org.elasticsearch.common.collect.Iterators;
18+
import org.elasticsearch.common.io.stream.DelayableWriteable;
1819
import org.elasticsearch.common.io.stream.StreamInput;
1920
import org.elasticsearch.common.io.stream.StreamOutput;
2021
import org.elasticsearch.common.io.stream.Writeable;
@@ -92,7 +93,15 @@ public class SearchResponse extends ActionResponse implements ChunkedToXContentO
9293
public SearchResponse(StreamInput in) throws IOException {
9394
super(in);
9495
this.hits = SearchHits.readFrom(in, true);
95-
this.aggregations = in.readBoolean() ? InternalAggregations.readFrom(in) : null;
96+
if (in.readBoolean()) {
97+
// deserialize the aggregations trying to deduplicate the object created
98+
// TODO: use DelayableWriteable instead.
99+
this.aggregations = InternalAggregations.readFrom(
100+
DelayableWriteable.wrapWithDeduplicatorStreamInput(in, in.getTransportVersion(), in.namedWriteableRegistry())
101+
);
102+
} else {
103+
this.aggregations = null;
104+
}
96105
this.suggest = in.readBoolean() ? new Suggest(in) : null;
97106
this.timedOut = in.readBoolean();
98107
this.terminatedEarly = in.readOptionalBoolean();

server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.TransportVersion;
1313
import org.elasticsearch.common.bytes.BytesReference;
1414
import org.elasticsearch.common.bytes.ReleasableBytesReference;
15+
import org.elasticsearch.core.Nullable;
1516
import org.elasticsearch.core.Releasable;
1617

1718
import java.io.IOException;
@@ -231,16 +232,24 @@ private static <T> T deserialize(
231232
NamedWriteableRegistry registry,
232233
BytesReference serialized
233234
) throws IOException {
234-
try (
235-
StreamInput in = registry == null
236-
? new DeduplicateStreamInput(serialized.streamInput(), new DeduplicatorCache())
237-
: new DeduplicateNamedWriteableAwareStreamInput(serialized.streamInput(), registry, new DeduplicatorCache())
238-
) {
239-
in.setTransportVersion(serializedAtVersion);
240-
return reader.read(in);
235+
try (StreamInput in = serialized.streamInput()) {
236+
return reader.read(wrapWithDeduplicatorStreamInput(in, serializedAtVersion, registry));
241237
}
242238
}
243239

240+
/** Wraps the provided {@link StreamInput} with another stream that extends {@link Deduplicator} */
241+
public static StreamInput wrapWithDeduplicatorStreamInput(
242+
StreamInput in,
243+
TransportVersion serializedAtVersion,
244+
@Nullable NamedWriteableRegistry registry
245+
) {
246+
StreamInput out = registry == null
247+
? new DeduplicateStreamInput(in, new DeduplicatorCache())
248+
: new DeduplicateNamedWriteableAwareStreamInput(in, registry, new DeduplicatorCache());
249+
out.setTransportVersion(serializedAtVersion);
250+
return out;
251+
}
252+
244253
/** An object implementing this interface can deduplicate instance of the provided objects.*/
245254
public interface Deduplicator {
246255
<T> T deduplicate(T object);

0 commit comments

Comments
 (0)