Skip to content

Commit c3a72e9

Browse files
authored
Add test to exercise reduction of terms aggregation order by key and fix pruning bug (#106799) (#106814)
We are not computing the otherDocCounts properly as we are exiting the iteration too early so we are not counting the pruned buckets. This commit make sure we are counting all buckets.
1 parent 10d088d commit c3a72e9

File tree

3 files changed

+106
-20
lines changed

3 files changed

+106
-20
lines changed

docs/changelog/106799.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 106799
2+
summary: Add test to exercise reduction of terms aggregation order by key
3+
area: Aggregations
4+
type: bug
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsIT.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Map;
5050
import java.util.Set;
5151
import java.util.function.Function;
52+
import java.util.stream.Collectors;
5253

5354
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
5455
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
@@ -1287,4 +1288,92 @@ public void testScriptWithValueType() throws Exception {
12871288
assertThat(ex.getCause().getMessage(), containsString("Unknown value type [foobar]"));
12881289
}
12891290
}
1291+
1292+
public void testOrderByKey() throws Exception {
1293+
Map<String, long[]> data = new HashMap<>();
1294+
for (int i = 0; i < 5; i++) {
1295+
assertAcked(
1296+
indicesAdmin().prepareCreate("idx" + i).setMapping(SINGLE_VALUED_FIELD_NAME, "type=keyword", "filter", "type=boolean")
1297+
);
1298+
List<IndexRequestBuilder> builders = new ArrayList<>();
1299+
for (int j = 0; j < 100; j++) {
1300+
String val = "val" + random().nextInt(1000);
1301+
boolean filter = randomBoolean();
1302+
long[] counter = data.computeIfAbsent(val, s -> new long[] { 0 });
1303+
if (filter == false) {
1304+
counter[0]++;
1305+
}
1306+
builders.add(
1307+
prepareIndex("idx" + i).setSource(
1308+
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, val).field("filter", filter).endObject()
1309+
)
1310+
);
1311+
}
1312+
indexRandom(true, builders);
1313+
}
1314+
List<String> allKeys = new ArrayList<>(data.keySet());
1315+
List<String> keysMinDocCount1 = allKeys.stream().filter(key -> data.get(key)[0] > 0).collect(Collectors.toList());
1316+
List<String> keysMinDocCount2 = allKeys.stream().filter(key -> data.get(key)[0] > 1).collect(Collectors.toList());
1317+
// test for different batch sizes to exercise partial reduces
1318+
for (int batchReduceSize = 2; batchReduceSize < 6; batchReduceSize++) {
1319+
// with min_doc_count = 0
1320+
allKeys.sort(String::compareTo);
1321+
assertOrderByKeyResponse(allKeys, data, true, 0, batchReduceSize);
1322+
Collections.reverse(allKeys);
1323+
assertOrderByKeyResponse(allKeys, data, false, 0, batchReduceSize);
1324+
// with min_doc_count = 1
1325+
keysMinDocCount1.sort(String::compareTo);
1326+
assertOrderByKeyResponse(keysMinDocCount1, data, true, 1, batchReduceSize);
1327+
Collections.reverse(keysMinDocCount1);
1328+
assertOrderByKeyResponse(keysMinDocCount1, data, false, 1, batchReduceSize);
1329+
// with min_doc_count = 2
1330+
keysMinDocCount2.sort(String::compareTo);
1331+
assertOrderByKeyResponse(keysMinDocCount2, data, true, 2, batchReduceSize);
1332+
Collections.reverse(keysMinDocCount2);
1333+
assertOrderByKeyResponse(keysMinDocCount2, data, false, 2, batchReduceSize);
1334+
}
1335+
for (int i = 0; i < 5; i++) {
1336+
assertAcked(indicesAdmin().prepareDelete("idx" + i));
1337+
}
1338+
}
1339+
1340+
private void assertOrderByKeyResponse(
1341+
List<String> keys,
1342+
Map<String, long[]> counts,
1343+
boolean asc,
1344+
int minDocCount,
1345+
int batchReduceSize
1346+
) {
1347+
int size = randomIntBetween(1, keys.size());
1348+
long sumOtherCount = 0;
1349+
for (int i = size; i < keys.size(); i++) {
1350+
sumOtherCount += counts.get(keys.get(i))[0];
1351+
}
1352+
final long finalSumOtherCount = sumOtherCount;
1353+
assertNoFailuresAndResponse(
1354+
prepareSearch("idx0", "idx1", "idx2", "idx3", "idx4").setBatchedReduceSize(batchReduceSize)
1355+
.setQuery(QueryBuilders.termQuery("filter", false))
1356+
.addAggregation(
1357+
new TermsAggregationBuilder("terms").field(SINGLE_VALUED_FIELD_NAME)
1358+
.size(size)
1359+
.shardSize(500)
1360+
.minDocCount(minDocCount)
1361+
.order(BucketOrder.key(asc))
1362+
),
1363+
response -> {
1364+
StringTerms terms = response.getAggregations().get("terms");
1365+
assertThat(terms, notNullValue());
1366+
assertThat(terms.getName(), equalTo("terms"));
1367+
assertThat(terms.getBuckets().size(), equalTo(size));
1368+
assertThat(terms.getSumOfOtherDocCounts(), equalTo(finalSumOtherCount));
1369+
1370+
for (int i = 0; i < size; i++) {
1371+
StringTerms.Bucket bucket = terms.getBuckets().get(i);
1372+
assertThat(bucket, notNullValue());
1373+
assertThat(bucket.getKeyAsString(), equalTo(keys.get(i)));
1374+
assertThat(bucket.getDocCount(), equalTo(counts.get(keys.get(i))[0]));
1375+
}
1376+
}
1377+
);
1378+
}
12901379
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Optional;
33-
import java.util.function.Function;
33+
import java.util.function.Consumer;
3434

3535
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyAsc;
3636
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
@@ -153,7 +153,7 @@ private long getDocCountError(A terms) {
153153
private BucketOrder reduceBuckets(
154154
List<InternalAggregation> aggregations,
155155
AggregationReduceContext reduceContext,
156-
Function<DelayedBucket<B>, Boolean> sink
156+
Consumer<DelayedBucket<B>> sink
157157
) {
158158
/*
159159
* Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}.
@@ -176,7 +176,7 @@ private void reduceMergeSort(
176176
List<InternalAggregation> aggregations,
177177
BucketOrder thisReduceOrder,
178178
AggregationReduceContext reduceContext,
179-
Function<DelayedBucket<B>, Boolean> sink
179+
Consumer<DelayedBucket<B>> sink
180180
) {
181181
assert isKeyOrder(thisReduceOrder);
182182
final Comparator<Bucket> cmp = thisReduceOrder.comparator();
@@ -201,12 +201,7 @@ protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
201201
assert lastBucket == null || cmp.compare(top.current(), lastBucket) >= 0;
202202
if (lastBucket != null && cmp.compare(top.current(), lastBucket) != 0) {
203203
// the key changed so bundle up the last key's worth of buckets
204-
boolean shouldContinue = sink.apply(
205-
new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets)
206-
);
207-
if (false == shouldContinue) {
208-
return;
209-
}
204+
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
210205
sameTermBuckets = new ArrayList<>();
211206
}
212207
lastBucket = top.current();
@@ -226,14 +221,14 @@ protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
226221
}
227222

228223
if (sameTermBuckets.isEmpty() == false) {
229-
sink.apply(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
224+
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
230225
}
231226
}
232227

233228
private void reduceLegacy(
234229
List<InternalAggregation> aggregations,
235230
AggregationReduceContext reduceContext,
236-
Function<DelayedBucket<B>, Boolean> sink
231+
Consumer<DelayedBucket<B>> sink
237232
) {
238233
Map<Object, List<B>> bucketMap = new HashMap<>();
239234
for (InternalAggregation aggregation : aggregations) {
@@ -246,12 +241,7 @@ private void reduceLegacy(
246241
}
247242
}
248243
for (List<B> sameTermBuckets : bucketMap.values()) {
249-
boolean shouldContinue = sink.apply(
250-
new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets)
251-
);
252-
if (false == shouldContinue) {
253-
return;
254-
}
244+
sink.accept(new DelayedBucket<>(AbstractInternalTerms.this::reduceBucket, reduceContext, sameTermBuckets));
255245
}
256246
}
257247

@@ -304,7 +294,6 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Aggr
304294
if (bucket.getDocCount() >= getMinDocCount()) {
305295
top.add(bucket);
306296
}
307-
return true;
308297
});
309298
result = top.build();
310299
} else {
@@ -316,8 +305,11 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Aggr
316305
boolean canPrune = isKeyOrder(getOrder()) && getMinDocCount() == 0;
317306
result = new ArrayList<>();
318307
thisReduceOrder = reduceBuckets(aggregations, reduceContext, bucket -> {
319-
result.add(bucket.reduced());
320-
return false == canPrune || result.size() < getRequiredSize();
308+
if (canPrune == false || result.size() < getRequiredSize()) {
309+
result.add(bucket.reduced());
310+
} else {
311+
otherDocCount[0] += bucket.getDocCount();
312+
}
321313
});
322314
}
323315
for (B r : result) {

0 commit comments

Comments
 (0)