Skip to content

Commit 0ab2afb

Browse files
authored
Deduplicate BucketOrder when deserializing (elastic#112707)
Deduplicate BucketOrder object by wrapping the StreamInput generated by DelayableWritable objects.
1 parent 8607d40 commit 0ab2afb

File tree

5 files changed

+129
-9
lines changed

5 files changed

+129
-9
lines changed

docs/changelog/112707.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 112707
2+
summary: Deduplicate `BucketOrder` when deserializing
3+
area: Aggregations
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import java.io.IOException;
1717
import java.io.UncheckedIOException;
18+
import java.util.HashMap;
19+
import java.util.Map;
1820

1921
/**
2022
* A holder for {@link Writeable}s that delays reading the underlying object
@@ -230,11 +232,72 @@ private static <T> T deserialize(
230232
) throws IOException {
231233
try (
232234
StreamInput in = registry == null
233-
? serialized.streamInput()
234-
: new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)
235+
? new DeduplicateStreamInput(serialized.streamInput(), new DeduplicatorCache())
236+
: new DeduplicateNamedWriteableAwareStreamInput(serialized.streamInput(), registry, new DeduplicatorCache())
235237
) {
236238
in.setTransportVersion(serializedAtVersion);
237239
return reader.read(in);
238240
}
239241
}
242+
243+
/** An object implementing this interface can deduplicate instance of the provided objects.*/
244+
public interface Deduplicator {
245+
<T> T deduplicate(T object);
246+
}
247+
248+
private static class DeduplicateStreamInput extends FilterStreamInput implements Deduplicator {
249+
250+
private final Deduplicator deduplicator;
251+
252+
private DeduplicateStreamInput(StreamInput delegate, Deduplicator deduplicator) {
253+
super(delegate);
254+
this.deduplicator = deduplicator;
255+
}
256+
257+
@Override
258+
public <T> T deduplicate(T object) {
259+
return deduplicator.deduplicate(object);
260+
}
261+
}
262+
263+
private static class DeduplicateNamedWriteableAwareStreamInput extends NamedWriteableAwareStreamInput implements Deduplicator {
264+
265+
private final Deduplicator deduplicator;
266+
267+
private DeduplicateNamedWriteableAwareStreamInput(
268+
StreamInput delegate,
269+
NamedWriteableRegistry registry,
270+
Deduplicator deduplicator
271+
) {
272+
super(delegate, registry);
273+
this.deduplicator = deduplicator;
274+
}
275+
276+
@Override
277+
public <T> T deduplicate(T object) {
278+
return deduplicator.deduplicate(object);
279+
}
280+
}
281+
282+
/**
283+
* Implementation of a {@link Deduplicator} cache. It can hold up to 1024 instances.
284+
*/
285+
private static class DeduplicatorCache implements Deduplicator {
286+
287+
private static final int MAX_SIZE = 1024;
288+
// lazily init
289+
private Map<Object, Object> cache = null;
290+
291+
@SuppressWarnings("unchecked")
292+
@Override
293+
public <T> T deduplicate(T object) {
294+
if (cache == null) {
295+
cache = new HashMap<>();
296+
cache.put(object, object);
297+
} else if (cache.size() < MAX_SIZE) {
298+
object = (T) cache.computeIfAbsent(object, o -> o);
299+
}
300+
return object;
301+
}
302+
}
240303
}

server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.search.aggregations;
99

1010
import org.elasticsearch.common.ParsingException;
11+
import org.elasticsearch.common.io.stream.DelayableWriteable;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.common.logging.DeprecationLogger;
@@ -36,6 +37,7 @@
3637
*/
3738
public abstract class InternalOrder extends BucketOrder {
3839
// TODO merge the contents of this file into BucketOrder. The way it is now is relic.
40+
3941
/**
4042
* {@link Bucket} ordering strategy to sort by a sub-aggregation.
4143
*/
@@ -476,6 +478,10 @@ public static class Streams {
476478
* @throws IOException on error reading from the stream.
477479
*/
478480
public static BucketOrder readOrder(StreamInput in) throws IOException {
481+
return readOrder(in, true);
482+
}
483+
484+
private static BucketOrder readOrder(StreamInput in, boolean dedupe) throws IOException {
479485
byte id = in.readByte();
480486
switch (id) {
481487
case COUNT_DESC_ID:
@@ -489,12 +495,18 @@ public static BucketOrder readOrder(StreamInput in) throws IOException {
489495
case Aggregation.ID:
490496
boolean asc = in.readBoolean();
491497
String key = in.readString();
498+
if (dedupe && in instanceof DelayableWriteable.Deduplicator bo) {
499+
return bo.deduplicate(new Aggregation(key, asc));
500+
}
492501
return new Aggregation(key, asc);
493502
case CompoundOrder.ID:
494503
int size = in.readVInt();
495504
List<BucketOrder> compoundOrder = new ArrayList<>(size);
496505
for (int i = 0; i < size; i++) {
497-
compoundOrder.add(Streams.readOrder(in));
506+
compoundOrder.add(Streams.readOrder(in, false));
507+
}
508+
if (dedupe && in instanceof DelayableWriteable.Deduplicator bo) {
509+
return bo.deduplicate(new CompoundOrder(compoundOrder, false));
498510
}
499511
return new CompoundOrder(compoundOrder, false);
500512
default:

server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.test.TransportVersionUtils;
1515

1616
import java.io.IOException;
17+
import java.util.Objects;
1718

1819
import static java.util.Collections.singletonList;
1920
import static org.hamcrest.Matchers.equalTo;
@@ -57,19 +58,23 @@ public int hashCode() {
5758
}
5859

5960
private static class NamedHolder implements Writeable {
60-
private final Example e;
61+
private final Example e1;
62+
private final Example e2;
6163

6264
NamedHolder(Example e) {
63-
this.e = e;
65+
this.e1 = e;
66+
this.e2 = e;
6467
}
6568

6669
NamedHolder(StreamInput in) throws IOException {
67-
e = in.readNamedWriteable(Example.class);
70+
e1 = ((DelayableWriteable.Deduplicator) in).deduplicate(in.readNamedWriteable(Example.class));
71+
e2 = ((DelayableWriteable.Deduplicator) in).deduplicate(in.readNamedWriteable(Example.class));
6872
}
6973

7074
@Override
7175
public void writeTo(StreamOutput out) throws IOException {
72-
out.writeNamedWriteable(e);
76+
out.writeNamedWriteable(e1);
77+
out.writeNamedWriteable(e2);
7378
}
7479

7580
@Override
@@ -78,12 +83,12 @@ public boolean equals(Object obj) {
7883
return false;
7984
}
8085
NamedHolder other = (NamedHolder) obj;
81-
return e.equals(other.e);
86+
return e1.equals(other.e1) && e2.equals(other.e2);
8287
}
8388

8489
@Override
8590
public int hashCode() {
86-
return e.hashCode();
91+
return Objects.hash(e1, e2);
8792
}
8893
}
8994

@@ -130,6 +135,9 @@ public void testRoundTripFromDelayedWithNamedWriteable() throws IOException {
130135
DelayableWriteable<NamedHolder> original = DelayableWriteable.referencing(n).asSerialized(NamedHolder::new, writableRegistry());
131136
assertTrue(original.isSerialized());
132137
roundTripTestCase(original, NamedHolder::new);
138+
NamedHolder copy = original.expand();
139+
// objects have been deduplicated
140+
assertSame(copy.e1, copy.e2);
133141
}
134142

135143
public void testRoundTripFromDelayedFromOldVersion() throws IOException {

server/src/test/java/org/elasticsearch/search/aggregations/InternalOrderTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
*/
88
package org.elasticsearch.search.aggregations;
99

10+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
11+
import org.elasticsearch.common.io.stream.DelayableWriteable;
12+
import org.elasticsearch.common.io.stream.FilterStreamInput;
13+
import org.elasticsearch.common.io.stream.StreamInput;
1014
import org.elasticsearch.common.io.stream.Writeable.Reader;
1115
import org.elasticsearch.search.aggregations.InternalOrder.CompoundOrder;
1216
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
@@ -116,4 +120,32 @@ protected BucketOrder mutateInstance(BucketOrder instance) {
116120
}
117121
}
118122

123+
public void testInternalOrderDeduplicated() throws IOException {
124+
BucketOrder testInstance = createTestInstance();
125+
try (BytesStreamOutput output = new BytesStreamOutput()) {
126+
instanceWriter().write(output, testInstance);
127+
if (testInstance instanceof CompoundOrder || testInstance instanceof InternalOrder.Aggregation) {
128+
assertNotSame(testInstance, instanceReader().read(output.bytes().streamInput()));
129+
}
130+
StreamInput dedupe = new DeduplicatorStreamInput(output.bytes().streamInput(), testInstance);
131+
assertSame(testInstance, instanceReader().read(dedupe));
132+
}
133+
}
134+
135+
private static class DeduplicatorStreamInput extends FilterStreamInput implements DelayableWriteable.Deduplicator {
136+
137+
private final BucketOrder order;
138+
139+
protected DeduplicatorStreamInput(StreamInput delegate, BucketOrder order) {
140+
super(delegate);
141+
this.order = order;
142+
}
143+
144+
@SuppressWarnings("unchecked")
145+
@Override
146+
public <T> T deduplicate(T object) {
147+
return (T) order;
148+
}
149+
}
150+
119151
}

0 commit comments

Comments
 (0)