Skip to content

Commit 453db3f

Browse files
Optimize InternalAggregations construction a little (#120868)
We can streamline and optimize this logic a little to see less copying and more compact results.
1 parent ddc2362 commit 453db3f

File tree

18 files changed

+134
-101
lines changed

18 files changed

+134
-101
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void setup() {
111111
dict[i] = new BytesRef(Long.toString(rand.nextLong()));
112112
}
113113
for (int i = 0; i < numShards; i++) {
114-
aggsList.add(InternalAggregations.from(Collections.singletonList(newTerms(rand, dict, true))));
114+
aggsList.add(InternalAggregations.from(newTerms(rand, dict, true)));
115115
}
116116
}
117117

@@ -124,7 +124,7 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
124124
for (BytesRef term : randomTerms) {
125125
InternalAggregations subAggs;
126126
if (withNested) {
127-
subAggs = InternalAggregations.from(Collections.singletonList(newTerms(rand, dict, false)));
127+
subAggs = InternalAggregations.from(newTerms(rand, dict, false));
128128
} else {
129129
subAggs = InternalAggregations.EMPTY;
130130
}

benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ public class StringTermsSerializationBenchmark {
5050

5151
@Setup
5252
public void initResults() {
53-
results = DelayableWriteable.referencing(InternalAggregations.from(List.of(newTerms(true))));
53+
results = DelayableWriteable.referencing(InternalAggregations.from(newTerms(true)));
5454
}
5555

5656
private StringTerms newTerms(boolean withNested) {
5757
List<StringTerms.Bucket> resultBuckets = new ArrayList<>(buckets);
5858
for (int i = 0; i < buckets; i++) {
59-
InternalAggregations inner = withNested ? InternalAggregations.from(List.of(newTerms(false))) : InternalAggregations.EMPTY;
59+
InternalAggregations inner = withNested ? InternalAggregations.from(newTerms(false)) : InternalAggregations.EMPTY;
6060
resultBuckets.add(new StringTerms.Bucket(new BytesRef("test" + i), i, inner, false, 0, DocValueFormat.RAW));
6161
}
6262
return new StringTerms(

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult current, Aggregati
414414

415415
Bucket lastBucket = null;
416416
ListIterator<Bucket> iter = list.listIterator();
417-
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(bucketInfo.emptySubAggregations), reduceContext);
417+
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(bucketInfo.emptySubAggregations, reduceContext);
418418

419419
// Add the empty buckets within the data,
420420
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public InternalAggregation get() {
222222
InternalBucket reducedBucket;
223223
if (bucketsWithSameKey.size() == 1) {
224224
reducedBucket = bucketsWithSameKey.get(0);
225-
reducedBucket.aggregations = InternalAggregations.reduce(List.of(reducedBucket.aggregations), reduceContext);
225+
reducedBucket.aggregations = InternalAggregations.reduce(reducedBucket.aggregations, reduceContext);
226226
} else {
227227
reducedBucket = reduceBucket(bucketsWithSameKey, reduceContext);
228228
}

modules/aggregations/src/main/java/org/elasticsearch/aggregations/pipeline/DerivativePipelineAggregator.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.util.ArrayList;
2323
import java.util.List;
2424
import java.util.Map;
25-
import java.util.stream.Collectors;
26-
import java.util.stream.StreamSupport;
2725

2826
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
2927

@@ -69,11 +67,16 @@ public InternalAggregation reduce(InternalAggregation aggregation, AggregationRe
6967
if (xAxisUnits != null) {
7068
xDiff = (thisBucketKey.doubleValue() - lastBucketKey.doubleValue()) / xAxisUnits;
7169
}
72-
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
73-
.collect(Collectors.toCollection(ArrayList::new));
74-
aggs.add(new Derivative(name(), gradient, xDiff, formatter, metadata()));
75-
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
76-
newBuckets.add(newBucket);
70+
newBuckets.add(
71+
factory.createBucket(
72+
factory.getKey(bucket),
73+
bucket.getDocCount(),
74+
InternalAggregations.append(
75+
bucket.getAggregations(),
76+
new Derivative(name(), gradient, xDiff, formatter, metadata())
77+
)
78+
)
79+
);
7780
} else {
7881
newBuckets.add(bucket);
7982
}

modules/aggregations/src/main/java/org/elasticsearch/aggregations/pipeline/MovFnPipelineAggregator.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import java.util.HashMap;
2626
import java.util.List;
2727
import java.util.Map;
28-
import java.util.stream.Collectors;
29-
import java.util.stream.StreamSupport;
3028

3129
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
3230

@@ -117,12 +115,11 @@ public InternalAggregation reduce(InternalAggregation aggregation, AggregationRe
117115
vars,
118116
values.subList(fromIndex, toIndex).stream().mapToDouble(Double::doubleValue).toArray()
119117
);
120-
121-
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
122-
.map(InternalAggregation.class::cast)
123-
.collect(Collectors.toCollection(ArrayList::new));
124-
aggs.add(new InternalSimpleValue(name(), result, formatter, metadata()));
125-
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
118+
newBucket = factory.createBucket(
119+
factory.getKey(bucket),
120+
bucket.getDocCount(),
121+
InternalAggregations.append(bucket.getAggregations(), new InternalSimpleValue(name(), result, formatter, metadata()))
122+
);
126123
index++;
127124
}
128125
newBuckets.add(newBucket);

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
1414
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.common.util.CollectionUtils;
1516
import org.elasticsearch.common.util.Maps;
1617
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
1718
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
@@ -47,7 +48,7 @@ public final class InternalAggregations implements Iterable<InternalAggregation>
4748
/**
4849
* Constructs a new aggregation.
4950
*/
50-
public InternalAggregations(List<InternalAggregation> aggregations) {
51+
private InternalAggregations(List<InternalAggregation> aggregations) {
5152
this.aggregations = aggregations;
5253
if (aggregations.isEmpty()) {
5354
aggregationsAsMap = Map.of();
@@ -70,14 +71,15 @@ public List<InternalAggregation> asList() {
7071
}
7172

7273
private Map<String, InternalAggregation> asMap() {
73-
if (aggregationsAsMap == null) {
74+
var res = aggregationsAsMap;
75+
if (res == null) {
7476
Map<String, InternalAggregation> newAggregationsAsMap = Maps.newMapWithExpectedSize(aggregations.size());
7577
for (InternalAggregation aggregation : aggregations) {
7678
newAggregationsAsMap.put(aggregation.getName(), aggregation);
7779
}
78-
this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap);
80+
res = this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap);
7981
}
80-
return aggregationsAsMap;
82+
return res;
8183
}
8284

8385
/**
@@ -121,13 +123,27 @@ public XContentBuilder toXContentInternal(XContentBuilder builder, Params params
121123
return builder;
122124
}
123125

126+
public static InternalAggregations from(InternalAggregation aggregation) {
127+
return new InternalAggregations(List.of(aggregation));
128+
}
129+
124130
public static InternalAggregations from(List<InternalAggregation> aggregations) {
125131
if (aggregations.isEmpty()) {
126132
return EMPTY;
127133
}
134+
if (aggregations.size() == 1) {
135+
return from(aggregations.getFirst());
136+
}
128137
return new InternalAggregations(aggregations);
129138
}
130139

140+
public static InternalAggregations append(InternalAggregations aggs, InternalAggregation toAppend) {
141+
if (aggs.aggregations.isEmpty()) {
142+
return from(toAppend);
143+
}
144+
return new InternalAggregations(CollectionUtils.appendToCopyNoNullElements(aggs.aggregations, toAppend));
145+
}
146+
131147
public static InternalAggregations readFrom(StreamInput in) throws IOException {
132148
return from(in.readNamedWriteableCollectionAsList(InternalAggregation.class));
133149
}
@@ -227,19 +243,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
227243
}
228244
// handle special case when there is just one aggregation
229245
if (aggregationsList.size() == 1) {
230-
final List<InternalAggregation> internalAggregations = aggregationsList.get(0).asList();
231-
final List<InternalAggregation> reduced = new ArrayList<>(internalAggregations.size());
232-
for (InternalAggregation aggregation : internalAggregations) {
233-
if (aggregation.mustReduceOnSingleInternalAgg()) {
234-
try (AggregatorReducer aggregatorReducer = aggregation.getReducer(context.forAgg(aggregation.getName()), 1)) {
235-
aggregatorReducer.accept(aggregation);
236-
reduced.add(aggregatorReducer.get());
237-
}
238-
} else {
239-
reduced.add(aggregation);
240-
}
241-
}
242-
return from(reduced);
246+
return reduce(aggregationsList.getFirst(), context);
243247
}
244248
// general case
245249
try (AggregatorsReducer reducer = new AggregatorsReducer(aggregationsList.get(0), context, aggregationsList.size())) {
@@ -250,6 +254,29 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
250254
}
251255
}
252256

257+
public static InternalAggregations reduce(InternalAggregations aggregations, AggregationReduceContext context) {
258+
final List<InternalAggregation> internalAggregations = aggregations.asList();
259+
int size = internalAggregations.size();
260+
if (size == 0) {
261+
return EMPTY;
262+
}
263+
boolean noneReduced = true;
264+
final List<InternalAggregation> reduced = new ArrayList<>(size);
265+
for (int i = 0; i < size; i++) {
266+
InternalAggregation aggregation = internalAggregations.get(i);
267+
if (aggregation.mustReduceOnSingleInternalAgg()) {
268+
noneReduced = false;
269+
try (AggregatorReducer aggregatorReducer = aggregation.getReducer(context.forAgg(aggregation.getName()), 1)) {
270+
aggregatorReducer.accept(aggregation);
271+
reduced.add(aggregatorReducer.get());
272+
}
273+
} else {
274+
reduced.add(aggregation);
275+
}
276+
}
277+
return noneReduced ? aggregations : from(reduced);
278+
}
279+
253280
/**
254281
* Finalizes the sampling for all the internal aggregations
255282
* @param samplingContext the sampling context

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ public void accept(long key) {
377377
iterateEmptyBuckets(list, list.listIterator(), counter);
378378
reduceContext.consumeBucketsAndMaybeBreak(counter.size);
379379

380-
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(emptyBucketInfo.subAggregations), reduceContext);
380+
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(emptyBucketInfo.subAggregations, reduceContext);
381381
ListIterator<Bucket> iter = list.listIterator();
382382
iterateEmptyBuckets(list, iter, new LongConsumer() {
383383
private int size = 0;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,7 @@ public void accept(double key) {
349349
/*
350350
* Now that we're sure we have space we allocate all the buckets.
351351
*/
352-
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(
353-
Collections.singletonList(emptyBucketInfo.subAggregations),
354-
reduceContext
355-
);
352+
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(emptyBucketInfo.subAggregations, reduceContext);
356353
ListIterator<Bucket> iter = list.listIterator();
357354
iterateEmptyBuckets(list, iter, new DoubleConsumer() {
358355
private int size;

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.util.HashMap;
2323
import java.util.List;
2424
import java.util.Map;
25-
import java.util.stream.Collectors;
26-
import java.util.stream.StreamSupport;
2725

2826
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
2927

@@ -80,13 +78,11 @@ public InternalAggregation reduce(InternalAggregation aggregation, AggregationRe
8078
if (returned == null) {
8179
newBuckets.add(bucket);
8280
} else {
83-
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
84-
.collect(Collectors.toCollection(ArrayList::new));
85-
86-
InternalSimpleValue simpleValue = new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata());
87-
aggs.add(simpleValue);
8881
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(
89-
InternalAggregations.from(aggs),
82+
InternalAggregations.append(
83+
bucket.getAggregations(),
84+
new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata())
85+
),
9086
bucket
9187
);
9288
newBuckets.add(newBucket);

0 commit comments

Comments
 (0)