Skip to content

Commit d8ac6a4

Browse files
Optimize InternalAggregations construction a little (elastic#120868) (elastic#126516)
We can streamline and optimize this logic a little to see less copying and more compact results.
1 parent 4a8b1c4 commit d8ac6a4

File tree

17 files changed

+132
-99
lines changed

17 files changed

+132
-99
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void setup() {
111111
dict[i] = new BytesRef(Long.toString(rand.nextLong()));
112112
}
113113
for (int i = 0; i < numShards; i++) {
114-
aggsList.add(InternalAggregations.from(Collections.singletonList(newTerms(rand, dict, true))));
114+
aggsList.add(InternalAggregations.from(newTerms(rand, dict, true)));
115115
}
116116
}
117117

@@ -124,7 +124,7 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
124124
for (BytesRef term : randomTerms) {
125125
InternalAggregations subAggs;
126126
if (withNested) {
127-
subAggs = InternalAggregations.from(Collections.singletonList(newTerms(rand, dict, false)));
127+
subAggs = InternalAggregations.from(newTerms(rand, dict, false));
128128
} else {
129129
subAggs = InternalAggregations.EMPTY;
130130
}

benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ public class StringTermsSerializationBenchmark {
5050

5151
@Setup
5252
public void initResults() {
53-
results = DelayableWriteable.referencing(InternalAggregations.from(List.of(newTerms(true))));
53+
results = DelayableWriteable.referencing(InternalAggregations.from(newTerms(true)));
5454
}
5555

5656
private StringTerms newTerms(boolean withNested) {
5757
List<StringTerms.Bucket> resultBuckets = new ArrayList<>(buckets);
5858
for (int i = 0; i < buckets; i++) {
59-
InternalAggregations inner = withNested ? InternalAggregations.from(List.of(newTerms(false))) : InternalAggregations.EMPTY;
59+
InternalAggregations inner = withNested ? InternalAggregations.from(newTerms(false)) : InternalAggregations.EMPTY;
6060
resultBuckets.add(new StringTerms.Bucket(new BytesRef("test" + i), i, inner, false, 0, DocValueFormat.RAW));
6161
}
6262
return new StringTerms(

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult current, Aggregati
414414

415415
Bucket lastBucket = null;
416416
ListIterator<Bucket> iter = list.listIterator();
417-
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(bucketInfo.emptySubAggregations), reduceContext);
417+
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(bucketInfo.emptySubAggregations, reduceContext);
418418

419419
// Add the empty buckets within the data,
420420
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public InternalAggregation get() {
222222
InternalBucket reducedBucket;
223223
if (bucketsWithSameKey.size() == 1) {
224224
reducedBucket = bucketsWithSameKey.get(0);
225-
reducedBucket.aggregations = InternalAggregations.reduce(List.of(reducedBucket.aggregations), reduceContext);
225+
reducedBucket.aggregations = InternalAggregations.reduce(reducedBucket.aggregations, reduceContext);
226226
} else {
227227
reducedBucket = reduceBucket(bucketsWithSameKey, reduceContext);
228228
}

modules/aggregations/src/main/java/org/elasticsearch/aggregations/pipeline/DerivativePipelineAggregator.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.util.ArrayList;
2323
import java.util.List;
2424
import java.util.Map;
25-
import java.util.stream.Collectors;
26-
import java.util.stream.StreamSupport;
2725

2826
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
2927

@@ -69,11 +67,16 @@ public InternalAggregation reduce(InternalAggregation aggregation, AggregationRe
6967
if (xAxisUnits != null) {
7068
xDiff = (thisBucketKey.doubleValue() - lastBucketKey.doubleValue()) / xAxisUnits;
7169
}
72-
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
73-
.collect(Collectors.toCollection(ArrayList::new));
74-
aggs.add(new Derivative(name(), gradient, xDiff, formatter, metadata()));
75-
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
76-
newBuckets.add(newBucket);
70+
newBuckets.add(
71+
factory.createBucket(
72+
factory.getKey(bucket),
73+
bucket.getDocCount(),
74+
InternalAggregations.append(
75+
bucket.getAggregations(),
76+
new Derivative(name(), gradient, xDiff, formatter, metadata())
77+
)
78+
)
79+
);
7780
} else {
7881
newBuckets.add(bucket);
7982
}

modules/aggregations/src/main/java/org/elasticsearch/aggregations/pipeline/MovFnPipelineAggregator.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import java.util.HashMap;
2626
import java.util.List;
2727
import java.util.Map;
28-
import java.util.stream.Collectors;
29-
import java.util.stream.StreamSupport;
3028

3129
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
3230

@@ -117,12 +115,11 @@ public InternalAggregation reduce(InternalAggregation aggregation, AggregationRe
117115
vars,
118116
values.subList(fromIndex, toIndex).stream().mapToDouble(Double::doubleValue).toArray()
119117
);
120-
121-
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
122-
.map(InternalAggregation.class::cast)
123-
.collect(Collectors.toCollection(ArrayList::new));
124-
aggs.add(new InternalSimpleValue(name(), result, formatter, metadata()));
125-
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
118+
newBucket = factory.createBucket(
119+
factory.getKey(bucket),
120+
bucket.getDocCount(),
121+
InternalAggregations.append(bucket.getAggregations(), new InternalSimpleValue(name(), result, formatter, metadata()))
122+
);
126123
index++;
127124
}
128125
newBuckets.add(newBucket);

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.io.stream.Writeable;
17+
import org.elasticsearch.common.util.CollectionUtils;
1718
import org.elasticsearch.common.util.Maps;
1819
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
1920
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
@@ -75,14 +76,15 @@ public List<InternalAggregation> asList() {
7576
}
7677

7778
private Map<String, InternalAggregation> asMap() {
78-
if (aggregationsAsMap == null) {
79+
var res = aggregationsAsMap;
80+
if (res == null) {
7981
Map<String, InternalAggregation> newAggregationsAsMap = Maps.newMapWithExpectedSize(aggregations.size());
8082
for (InternalAggregation aggregation : aggregations) {
8183
newAggregationsAsMap.put(aggregation.getName(), aggregation);
8284
}
83-
this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap);
85+
res = this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap);
8486
}
85-
return aggregationsAsMap;
87+
return res;
8688
}
8789

8890
/**
@@ -147,13 +149,27 @@ public static InternalAggregations fromXContent(XContentParser parser) throws IO
147149
return new InternalAggregations(aggregations);
148150
}
149151

152+
public static InternalAggregations from(InternalAggregation aggregation) {
153+
return new InternalAggregations(List.of(aggregation));
154+
}
155+
150156
public static InternalAggregations from(List<InternalAggregation> aggregations) {
151157
if (aggregations.isEmpty()) {
152158
return EMPTY;
153159
}
160+
if (aggregations.size() == 1) {
161+
return from(aggregations.get(0));
162+
}
154163
return new InternalAggregations(aggregations);
155164
}
156165

166+
public static InternalAggregations append(InternalAggregations aggs, InternalAggregation toAppend) {
167+
if (aggs.aggregations.isEmpty()) {
168+
return from(toAppend);
169+
}
170+
return new InternalAggregations(CollectionUtils.appendToCopyNoNullElements(aggs.aggregations, toAppend));
171+
}
172+
157173
public static InternalAggregations readFrom(StreamInput in) throws IOException {
158174
return from(in.readNamedWriteableCollectionAsList(InternalAggregation.class));
159175
}
@@ -253,19 +269,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
253269
}
254270
// handle special case when there is just one aggregation
255271
if (aggregationsList.size() == 1) {
256-
final List<InternalAggregation> internalAggregations = aggregationsList.get(0).asList();
257-
final List<InternalAggregation> reduced = new ArrayList<>(internalAggregations.size());
258-
for (InternalAggregation aggregation : internalAggregations) {
259-
if (aggregation.mustReduceOnSingleInternalAgg()) {
260-
try (AggregatorReducer aggregatorReducer = aggregation.getReducer(context.forAgg(aggregation.getName()), 1)) {
261-
aggregatorReducer.accept(aggregation);
262-
reduced.add(aggregatorReducer.get());
263-
}
264-
} else {
265-
reduced.add(aggregation);
266-
}
267-
}
268-
return from(reduced);
272+
return reduce(aggregationsList.get(0), context);
269273
}
270274
// general case
271275
try (AggregatorsReducer reducer = new AggregatorsReducer(aggregationsList.get(0), context, aggregationsList.size())) {
@@ -276,6 +280,29 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
276280
}
277281
}
278282

283+
public static InternalAggregations reduce(InternalAggregations aggregations, AggregationReduceContext context) {
284+
final List<InternalAggregation> internalAggregations = aggregations.asList();
285+
int size = internalAggregations.size();
286+
if (size == 0) {
287+
return EMPTY;
288+
}
289+
boolean noneReduced = true;
290+
final List<InternalAggregation> reduced = new ArrayList<>(size);
291+
for (int i = 0; i < size; i++) {
292+
InternalAggregation aggregation = internalAggregations.get(i);
293+
if (aggregation.mustReduceOnSingleInternalAgg()) {
294+
noneReduced = false;
295+
try (AggregatorReducer aggregatorReducer = aggregation.getReducer(context.forAgg(aggregation.getName()), 1)) {
296+
aggregatorReducer.accept(aggregation);
297+
reduced.add(aggregatorReducer.get());
298+
}
299+
} else {
300+
reduced.add(aggregation);
301+
}
302+
}
303+
return noneReduced ? aggregations : from(reduced);
304+
}
305+
279306
/**
280307
* Finalizes the sampling for all the internal aggregations
281308
* @param samplingContext the sampling context

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ public void accept(long key) {
377377
iterateEmptyBuckets(list, list.listIterator(), counter);
378378
reduceContext.consumeBucketsAndMaybeBreak(counter.size);
379379

380-
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(emptyBucketInfo.subAggregations), reduceContext);
380+
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(emptyBucketInfo.subAggregations, reduceContext);
381381
ListIterator<Bucket> iter = list.listIterator();
382382
iterateEmptyBuckets(list, iter, new LongConsumer() {
383383
private int size = 0;

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,7 @@ public void accept(double key) {
349349
/*
350350
* Now that we're sure we have space we allocate all the buckets.
351351
*/
352-
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(
353-
Collections.singletonList(emptyBucketInfo.subAggregations),
354-
reduceContext
355-
);
352+
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(emptyBucketInfo.subAggregations, reduceContext);
356353
ListIterator<Bucket> iter = list.listIterator();
357354
iterateEmptyBuckets(list, iter, new DoubleConsumer() {
358355
private int size;

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.util.HashMap;
2323
import java.util.List;
2424
import java.util.Map;
25-
import java.util.stream.Collectors;
26-
import java.util.stream.StreamSupport;
2725

2826
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
2927

@@ -80,13 +78,11 @@ public InternalAggregation reduce(InternalAggregation aggregation, AggregationRe
8078
if (returned == null) {
8179
newBuckets.add(bucket);
8280
} else {
83-
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
84-
.collect(Collectors.toCollection(ArrayList::new));
85-
86-
InternalSimpleValue simpleValue = new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata());
87-
aggs.add(simpleValue);
8881
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(
89-
InternalAggregations.from(aggs),
82+
InternalAggregations.append(
83+
bucket.getAggregations(),
84+
new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata())
85+
),
9086
bucket
9187
);
9288
newBuckets.add(newBucket);

0 commit comments

Comments
 (0)