Skip to content

Commit f2d6c25

Browse files
authored
Allow terms to run as filters (#68871)
This adds yet another terms aggregator that uses `term` filters to run in similar speed to specialized low cardinality terms aggregator. It is mostly useful as a stepping stone for some additional optimizations that we can make later. So it doesn't have to be faster on its own. Just not *slower*. And its about the same speed.
1 parent 4d1dc7f commit f2d6c25

File tree

14 files changed

+721
-420
lines changed

14 files changed

+721
-420
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,19 @@ public AdaptingAggregator(
3737
/*
3838
* Lock the parent of the sub-aggregators to *this* instead of to
3939
* the delegate. This keeps the parent link shaped like the requested
40-
* agg tree. Thisis how it has always been and some aggs rely on it.
40+
* agg tree the rate aggregator needs this or it will die.
4141
*/
4242
this.delegate = delegate.apply(subAggregators.fixParent(this));
43-
assert this.delegate.parent() == parent : "invalid parent set on delegate";
43+
if (this.delegate.parent() != parent) {
44+
throw new IllegalStateException("invalid parent set on delegate");
45+
}
4446
}
4547

4648
/**
4749
* Adapt the result from the collecting {@linkplain Aggregator} into the
4850
* result expected by this {@linkplain Aggregator}.
4951
*/
50-
protected abstract InternalAggregation adapt(InternalAggregation delegateResult);
52+
protected abstract InternalAggregation adapt(InternalAggregation delegateResult) throws IOException;
5153

5254
@Override
5355
public final void close() {
@@ -101,7 +103,12 @@ public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) th
101103

102104
@Override
103105
public final InternalAggregation buildEmptyAggregation() {
104-
return adapt(delegate.buildEmptyAggregation());
106+
try {
107+
return adapt(delegate.buildEmptyAggregation());
108+
} catch (IOException e) {
109+
// We don't expect this to happen, but computers are funny.
110+
throw new AggregationExecutionException("io error while building empty agg", e);
111+
}
105112
}
106113

107114
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -143,20 +143,8 @@ public static FiltersAggregator build(
143143
CardinalityUpperBound cardinality,
144144
Map<String, Object> metadata
145145
) throws IOException {
146-
FiltersAggregator filterOrder = buildFilterOrderOrNull(
147-
name,
148-
factories,
149-
keys,
150-
filters,
151-
keyed,
152-
otherBucketKey,
153-
context,
154-
parent,
155-
cardinality,
156-
metadata
157-
);
158-
if (filterOrder != null) {
159-
return filterOrder;
146+
if (canUseFilterByFilter(parent, factories, otherBucketKey)) {
147+
return buildFilterByFilter(name, factories, keys, filters, keyed, otherBucketKey, context, parent, cardinality, metadata);
160148
}
161149
return new FiltersAggregator.Compatible(
162150
name,
@@ -172,14 +160,22 @@ public static FiltersAggregator build(
172160
);
173161
}
174162

163+
/**
164+
* Can this aggregation be executed using the {@link FilterByFilter}? That
165+
* aggregator is much faster than the fallback {@link Compatible} aggregator.
166+
*/
167+
public static boolean canUseFilterByFilter(Aggregator parent, AggregatorFactories factories, String otherBucketKey) {
168+
return parent == null && factories.countAggregators() == 0 && otherBucketKey == null;
169+
}
170+
175171
/**
176172
* Build an {@link Aggregator} for a {@code filters} aggregation if we
177173
* can collect {@link FilterByFilter}, otherwise return {@code null}. We can
178174
* collect filter by filter if there isn't a parent, there aren't children,
179175
* and we don't collect "other" buckets. Collecting {@link FilterByFilter}
180176
* is generally going to be much faster than the {@link Compatible} aggregator.
181177
*/
182-
public static FilterByFilter buildFilterOrderOrNull(
178+
public static FilterByFilter buildFilterByFilter(
183179
String name,
184180
AggregatorFactories factories,
185181
String[] keys,
@@ -191,14 +187,8 @@ public static FilterByFilter buildFilterOrderOrNull(
191187
CardinalityUpperBound cardinality,
192188
Map<String, Object> metadata
193189
) throws IOException {
194-
if (parent != null) {
195-
return null;
196-
}
197-
if (factories.countAggregators() != 0) {
198-
return null;
199-
}
200-
if (otherBucketKey != null) {
201-
return null;
190+
if (false == canUseFilterByFilter(parent, factories, otherBucketKey)) {
191+
throw new IllegalStateException("Can't execute filter-by-filter");
202192
}
203193
return new FiltersAggregator.FilterByFilter(
204194
name,

server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.common.xcontent.XContentBuilder;
14-
import org.elasticsearch.search.aggregations.Aggregations;
1514
import org.elasticsearch.search.aggregations.InternalAggregation;
1615
import org.elasticsearch.search.aggregations.InternalAggregations;
1716
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@@ -71,7 +70,7 @@ public long getDocCount() {
7170
}
7271

7372
@Override
74-
public Aggregations getAggregations() {
73+
public InternalAggregations getAggregations() {
7574
return aggregations;
7675
}
7776

server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,9 @@ public static FromFilters<?> adaptIntoFiltersOrNull(
351351
// We don't generate sensible Queries for nanoseconds.
352352
return null;
353353
}
354+
if (false == FiltersAggregator.canUseFilterByFilter(parent, factories, null)) {
355+
return null;
356+
}
354357
boolean wholeNumbersOnly = false == ((ValuesSource.Numeric) valuesSourceConfig.getValuesSource()).isFloatingPoint();
355358
String[] keys = new String[ranges.length];
356359
Query[] filters = new Query[ranges.length];
@@ -382,29 +385,22 @@ public static FromFilters<?> adaptIntoFiltersOrNull(
382385
builder.to(ranges[i].to == Double.POSITIVE_INFINITY ? null : format.format(ranges[i].to)).includeUpper(false);
383386
filters[i] = context.buildQuery(builder);
384387
}
385-
FiltersAggregator.FilterByFilter delegate = FiltersAggregator.buildFilterOrderOrNull(
386-
name,
387-
factories,
388-
keys,
389-
filters,
390-
false,
391-
null,
392-
context,
393-
parent,
394-
cardinality,
395-
metadata
396-
);
397-
if (delegate == null) {
398-
return null;
399-
}
400388
RangeAggregator.FromFilters<?> fromFilters = new RangeAggregator.FromFilters<>(
401389
parent,
402390
factories,
403391
subAggregators -> {
404-
if (subAggregators.countAggregators() > 0) {
405-
throw new IllegalStateException("didn't expect to have a delegate if there are child aggs");
406-
}
407-
return delegate;
392+
return FiltersAggregator.buildFilterByFilter(
393+
name,
394+
subAggregators,
395+
keys,
396+
filters,
397+
false,
398+
null,
399+
context,
400+
parent,
401+
cardinality,
402+
metadata
403+
);
408404
},
409405
valuesSourceConfig.format(),
410406
ranges,

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.search.aggregations.bucket.terms;
1010

1111
import org.apache.lucene.index.DocValues;
12-
import org.apache.lucene.index.IndexReader;
1312
import org.apache.lucene.index.LeafReaderContext;
1413
import org.apache.lucene.index.SortedDocValues;
1514
import org.apache.lucene.index.SortedSetDocValues;
@@ -72,10 +71,11 @@ public GlobalOrdinalsStringTermsAggregator(
7271
AggregatorFactories factories,
7372
Function<GlobalOrdinalsStringTermsAggregator, ResultStrategy<?, ?, ?>> resultStrategy,
7473
ValuesSource.Bytes.WithOrdinals valuesSource,
74+
SortedSetDocValues values,
7575
BucketOrder order,
7676
DocValueFormat format,
7777
BucketCountThresholds bucketCountThresholds,
78-
IncludeExclude.OrdinalsFilter includeExclude,
78+
LongPredicate acceptedOrds,
7979
AggregationContext context,
8080
Aggregator parent,
8181
boolean remapGlobalOrds,
@@ -87,12 +87,9 @@ public GlobalOrdinalsStringTermsAggregator(
8787
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
8888
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
8989
this.valuesSource = valuesSource;
90-
final IndexReader reader = context.searcher().getIndexReader();
91-
final SortedSetDocValues values = reader.leaves().size() > 0 ?
92-
valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) : DocValues.emptySortedSet();
9390
this.valueCount = values.getValueCount();
9491
this.lookupGlobalOrd = values::lookupOrd;
95-
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
92+
this.acceptedGlobalOrdinals = acceptedOrds;
9693
if (remapGlobalOrds) {
9794
this.collectionStrategy = new RemapGlobalOrds(cardinality);
9895
} else {
@@ -267,6 +264,7 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator {
267264
AggregatorFactories factories,
268265
Function<GlobalOrdinalsStringTermsAggregator, ResultStrategy<?, ?, ?>> resultStrategy,
269266
ValuesSource.Bytes.WithOrdinals valuesSource,
267+
SortedSetDocValues values,
270268
BucketOrder order,
271269
DocValueFormat format,
272270
BucketCountThresholds bucketCountThresholds,
@@ -277,8 +275,24 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator {
277275
boolean showTermDocCountError,
278276
Map<String, Object> metadata
279277
) throws IOException {
280-
super(name, factories, resultStrategy, valuesSource, order, format, bucketCountThresholds, null, context,
281-
parent, remapGlobalOrds, collectionMode, showTermDocCountError, CardinalityUpperBound.ONE, metadata);
278+
super(
279+
name,
280+
factories,
281+
resultStrategy,
282+
valuesSource,
283+
values,
284+
order,
285+
format,
286+
bucketCountThresholds,
287+
l -> true,
288+
context,
289+
parent,
290+
remapGlobalOrds,
291+
collectionMode,
292+
showTermDocCountError,
293+
CardinalityUpperBound.ONE,
294+
metadata
295+
);
282296
assert factories == null || factories.countAggregators() == 0;
283297
this.segmentDocCounts = context.bigArrays().newLongArray(1, true);
284298
}
@@ -907,5 +921,5 @@ private void oversizedCopy(BytesRef from, BytesRef to) {
907921
/**
908922
* Predicate used for {@link #acceptedGlobalOrdinals} if there is no filter.
909923
*/
910-
private static final LongPredicate ALWAYS_TRUE = l -> true;
924+
static final LongPredicate ALWAYS_TRUE = l -> true;
911925
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.search.aggregations.bucket.terms;
1010

11+
import org.apache.lucene.index.SortedSetDocValues;
1112
import org.elasticsearch.common.ParseField;
1213
import org.elasticsearch.common.logging.DeprecationCategory;
1314
import org.elasticsearch.common.logging.DeprecationLogger;
@@ -286,7 +287,6 @@ Aggregator create(String name,
286287
CardinalityUpperBound cardinality,
287288
Map<String, Object> metadata) throws IOException {
288289

289-
final IncludeExclude.OrdinalsFilter filter = includeExclude == null ? null : includeExclude.convertToOrdinalsFilter(format);
290290
boolean remapGlobalOrd = true;
291291
if (cardinality == CardinalityUpperBound.ONE && factories == AggregatorFactories.EMPTY && includeExclude == null) {
292292
/*
@@ -298,15 +298,18 @@ Aggregator create(String name,
298298
remapGlobalOrd = false;
299299
}
300300

301+
ValuesSource.Bytes.WithOrdinals.FieldData ordinalsValuesSource = (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource;
302+
SortedSetDocValues values = TermsAggregatorFactory.globalOrdsValues(context, ordinalsValuesSource);
301303
return new GlobalOrdinalsStringTermsAggregator(
302304
name,
303305
factories,
304306
a -> a.new SignificantTermsResults(lookup, significanceHeuristic, cardinality),
305-
(ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource,
307+
ordinalsValuesSource,
308+
values,
306309
null,
307310
format,
308311
bucketCountThresholds,
309-
filter,
312+
TermsAggregatorFactory.gloabalOrdsFilter(includeExclude, format, values),
310313
context,
311314
parent,
312315
remapGlobalOrd,

0 commit comments

Comments
 (0)