Skip to content

Commit 3e2fa09

Browse files
committed
Fix merging of terms aggregation with compound order (#64469)
This change fixes a bug introduced in #61779 that uses a compound order to compare buckets when merging. The bug is triggered when the compound order uses a primary sort ordered by key (asc or desc). This commit ensures that we always extract the primary sort when comparing keys during merging. The PR is marked as no-issue since the bug has not been released in any official version.
1 parent a3d9408 commit 3e2fa09

File tree

5 files changed

+133
-82
lines changed

5 files changed

+133
-82
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.Map;
4646
import java.util.Objects;
4747

48+
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyAsc;
4849
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
4950

5051
public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends InternalTerms.Bucket<B>>
@@ -257,9 +258,9 @@ private long getDocCountError(InternalTerms<?, ?> terms) {
257258
}
258259

259260
private List<B> reduceMergeSort(List<InternalAggregation> aggregations,
260-
BucketOrder reduceOrder, ReduceContext reduceContext) {
261-
assert isKeyOrder(reduceOrder);
262-
final Comparator<MultiBucketsAggregation.Bucket> cmp = reduceOrder.comparator();
261+
BucketOrder thisReduceOrder, ReduceContext reduceContext) {
262+
assert isKeyOrder(thisReduceOrder);
263+
final Comparator<MultiBucketsAggregation.Bucket> cmp = thisReduceOrder.comparator();
263264
final PriorityQueue<IteratorAndCurrent<B>> pq = new PriorityQueue<IteratorAndCurrent<B>>(aggregations.size()) {
264265
@Override
265266
protected boolean lessThan(IteratorAndCurrent<B> a, IteratorAndCurrent<B> b) {
@@ -369,15 +370,22 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
369370
bucket.docCountError -= thisAggDocCountError;
370371
}
371372
}
373+
374+
final List<B> reducedBuckets;
372375
/**
373376
* Buckets returned by a partial reduce or a shard response are sorted by key since {@link Version#V_7_10_0}.
374377
* That allows to perform a merge sort when reducing multiple aggregations together.
375378
* For backward compatibility, we disable the merge sort and use ({@link InternalTerms#reduceLegacy} if any of
376379
* the provided aggregations use a different {@link InternalTerms#reduceOrder}.
377380
*/
378381
BucketOrder thisReduceOrder = getReduceOrder(aggregations);
379-
List<B> reducedBuckets = isKeyOrder(thisReduceOrder) ?
380-
reduceMergeSort(aggregations, thisReduceOrder, reduceContext) : reduceLegacy(aggregations, reduceContext);
382+
if (isKeyOrder(thisReduceOrder)) {
383+
// extract the primary sort in case this is a compound order.
384+
thisReduceOrder = InternalOrder.key(isKeyAsc(thisReduceOrder) ? true : false);
385+
reducedBuckets = reduceMergeSort(aggregations, thisReduceOrder, reduceContext);
386+
} else {
387+
reducedBuckets = reduceLegacy(aggregations, reduceContext);
388+
}
381389
final B[] list;
382390
if (reduceContext.isFinalReduce()) {
383391
final int size = Math.min(requiredSize, reducedBuckets.size());

server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTermsTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535

3636
public abstract class InternalTermsTestCase extends InternalMultiBucketAggregationTestCase<InternalTerms<?, ?>> {
3737

38-
private boolean showDocCount;
39-
private long docCountError;
38+
protected boolean showDocCount;
39+
protected long docCountError;
4040

4141
@Before
4242
public void init() {

server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsTests.java

Lines changed: 108 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -34,31 +34,24 @@
3434
import java.util.Set;
3535

3636
public class StringTermsTests extends InternalTermsTestCase {
37-
3837
@Override
3938
protected InternalTerms<?, ?> createTestInstance(String name,
4039
Map<String, Object> metadata,
4140
InternalAggregations aggregations,
4241
boolean showTermDocCountError,
4342
long docCountError) {
44-
BucketOrder order = BucketOrder.count(false);
45-
long minDocCount = 1;
46-
int requiredSize = 3;
47-
int shardSize = requiredSize + 2;
48-
DocValueFormat format = DocValueFormat.RAW;
49-
long otherDocCount = 0;
50-
List<StringTerms.Bucket> buckets = new ArrayList<>();
51-
final int numBuckets = randomNumberOfBuckets();
52-
Set<BytesRef> terms = new HashSet<>();
53-
for (int i = 0; i < numBuckets; ++i) {
54-
BytesRef term = randomValueOtherThanMany(b -> terms.add(b) == false, () -> new BytesRef(randomAlphaOfLength(10)));
55-
int docCount = randomIntBetween(1, 100);
56-
buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
43+
return createTestInstance(generateRandomDict(), name, metadata, aggregations, showTermDocCountError, docCountError);
44+
}
45+
46+
@Override
47+
protected List<InternalTerms<?, ?>> randomResultsToReduce(String name, int size) {
48+
List<InternalTerms<?, ?>> inputs = new ArrayList<>();
49+
BytesRef[] dict = generateRandomDict();
50+
for (int i = 0; i < size; i++) {
51+
InternalTerms<?, ?> t = randomBoolean() ? createUnmappedInstance(name) : createTestInstance(dict, name);
52+
inputs.add(t);
5753
}
58-
BucketOrder reduceOrder = rarely() ? order : BucketOrder.key(true);
59-
Collections.sort(buckets, reduceOrder.comparator());
60-
return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount,
61-
metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
54+
return inputs;
6255
}
6356

6457
@Override
@@ -82,74 +75,116 @@ protected Class<? extends ParsedMultiBucketAggregation> implementationClass() {
8275
long docCountError = stringTerms.getDocCountError();
8376
Map<String, Object> metadata = stringTerms.getMetadata();
8477
switch (between(0, 8)) {
85-
case 0:
86-
name += randomAlphaOfLength(5);
87-
break;
88-
case 1:
89-
requiredSize += between(1, 100);
90-
break;
91-
case 2:
92-
minDocCount += between(1, 100);
93-
break;
94-
case 3:
95-
shardSize += between(1, 100);
96-
break;
97-
case 4:
98-
showTermDocCountError = showTermDocCountError == false;
99-
break;
100-
case 5:
101-
otherDocCount += between(1, 100);
102-
break;
103-
case 6:
104-
docCountError += between(1, 100);
105-
break;
106-
case 7:
107-
buckets = new ArrayList<>(buckets);
108-
buckets.add(new StringTerms.Bucket(new BytesRef(randomAlphaOfLengthBetween(1, 10)), randomNonNegativeLong(),
78+
case 0:
79+
name += randomAlphaOfLength(5);
80+
break;
81+
case 1:
82+
requiredSize += between(1, 100);
83+
break;
84+
case 2:
85+
minDocCount += between(1, 100);
86+
break;
87+
case 3:
88+
shardSize += between(1, 100);
89+
break;
90+
case 4:
91+
showTermDocCountError = showTermDocCountError == false;
92+
break;
93+
case 5:
94+
otherDocCount += between(1, 100);
95+
break;
96+
case 6:
97+
docCountError += between(1, 100);
98+
break;
99+
case 7:
100+
buckets = new ArrayList<>(buckets);
101+
buckets.add(new StringTerms.Bucket(new BytesRef(randomAlphaOfLengthBetween(1, 10)), randomNonNegativeLong(),
109102
InternalAggregations.EMPTY, showTermDocCountError, docCountError, format));
110-
break;
111-
case 8:
112-
if (metadata == null) {
113-
metadata = new HashMap<>(1);
114-
} else {
115-
metadata = new HashMap<>(instance.getMetadata());
116-
}
117-
metadata.put(randomAlphaOfLength(15), randomInt());
118-
break;
119-
default:
120-
throw new AssertionError("Illegal randomisation branch");
103+
break;
104+
case 8:
105+
if (metadata == null) {
106+
metadata = new HashMap<>(1);
107+
} else {
108+
metadata = new HashMap<>(instance.getMetadata());
109+
}
110+
metadata.put(randomAlphaOfLength(15), randomInt());
111+
break;
112+
default:
113+
throw new AssertionError("Illegal randomisation branch");
121114
}
122115
Collections.sort(buckets, stringTerms.reduceOrder.comparator());
123116
return new StringTerms(name, stringTerms.reduceOrder, order, requiredSize, minDocCount, metadata, format, shardSize,
124-
showTermDocCountError, otherDocCount, buckets, docCountError);
117+
showTermDocCountError, otherDocCount, buckets, docCountError);
125118
} else {
126119
String name = instance.getName();
127120
BucketOrder order = instance.order;
128121
int requiredSize = instance.requiredSize;
129122
long minDocCount = instance.minDocCount;
130123
Map<String, Object> metadata = instance.getMetadata();
131124
switch (between(0, 3)) {
132-
case 0:
133-
name += randomAlphaOfLength(5);
134-
break;
135-
case 1:
136-
requiredSize += between(1, 100);
137-
break;
138-
case 2:
139-
minDocCount += between(1, 100);
140-
break;
141-
case 3:
142-
if (metadata == null) {
143-
metadata = new HashMap<>(1);
144-
} else {
145-
metadata = new HashMap<>(instance.getMetadata());
146-
}
147-
metadata.put(randomAlphaOfLength(15), randomInt());
148-
break;
149-
default:
150-
throw new AssertionError("Illegal randomisation branch");
125+
case 0:
126+
name += randomAlphaOfLength(5);
127+
break;
128+
case 1:
129+
requiredSize += between(1, 100);
130+
break;
131+
case 2:
132+
minDocCount += between(1, 100);
133+
break;
134+
case 3:
135+
if (metadata == null) {
136+
metadata = new HashMap<>(1);
137+
} else {
138+
metadata = new HashMap<>(instance.getMetadata());
139+
}
140+
metadata.put(randomAlphaOfLength(15), randomInt());
141+
break;
142+
default:
143+
throw new AssertionError("Illegal randomisation branch");
151144
}
152145
return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata);
153146
}
154147
}
148+
149+
private BytesRef[] generateRandomDict() {
150+
Set<BytesRef> terms = new HashSet<>();
151+
int numTerms = randomIntBetween(2, 100);
152+
for (int i = 0; i < numTerms; i++) {
153+
terms.add(new BytesRef(randomAlphaOfLength(10)));
154+
}
155+
return terms.stream().toArray(BytesRef[]::new);
156+
}
157+
158+
private InternalTerms<?, ?> createTestInstance(BytesRef[] dict, String name) {
159+
return createTestInstance(dict, name, createTestMetadata(), createSubAggregations(), showDocCount, docCountError);
160+
}
161+
162+
private InternalTerms<?, ?> createTestInstance(BytesRef[] dict,
163+
String name,
164+
Map<String, Object> metadata,
165+
InternalAggregations aggregations,
166+
boolean showTermDocCountError,
167+
long docCountError) {
168+
BucketOrder order = BucketOrder.count(false);
169+
long minDocCount = 1;
170+
int requiredSize = 3;
171+
int shardSize = requiredSize + 2;
172+
DocValueFormat format = DocValueFormat.RAW;
173+
long otherDocCount = 0;
174+
List<StringTerms.Bucket> buckets = new ArrayList<>();
175+
final int numBuckets = randomNumberOfBuckets();
176+
Set<BytesRef> terms = new HashSet<>();
177+
for (int i = 0; i < numBuckets; ++i) {
178+
BytesRef term = dict[randomIntBetween(0, dict.length-1)];
179+
if (terms.add(term)) {
180+
int docCount = randomIntBetween(1, 100);
181+
buckets.add(new StringTerms.Bucket(term, docCount, aggregations, showTermDocCountError, docCountError, format));
182+
}
183+
}
184+
BucketOrder reduceOrder = randomBoolean() ?
185+
BucketOrder.compound(BucketOrder.key(true), BucketOrder.count(false)) : BucketOrder.key(true);
186+
Collections.sort(buckets, reduceOrder.comparator());
187+
return new StringTerms(name, reduceOrder, order, requiredSize, minDocCount,
188+
metadata, format, shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
189+
}
155190
}

test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ public final T createTestInstance() {
419419
return createTestInstance(randomAlphaOfLength(5));
420420
}
421421

422-
private T createTestInstance(String name) {
422+
public final Map<String, Object> createTestMetadata() {
423423
Map<String, Object> metadata = null;
424424
if (randomBoolean()) {
425425
metadata = new HashMap<>();
@@ -428,7 +428,11 @@ private T createTestInstance(String name) {
428428
metadata.put(randomAlphaOfLength(5), randomAlphaOfLength(5));
429429
}
430430
}
431-
return createTestInstance(name, metadata);
431+
return metadata;
432+
}
433+
434+
private T createTestInstance(String name) {
435+
return createTestInstance(name, createTestMetadata());
432436
}
433437

434438
/** Return an instance on an unmapped field. */

test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public void setSubAggregationsSupplier(Supplier<InternalAggregations> subAggrega
6666
this.subAggregationsSupplier = subAggregationsSupplier;
6767
}
6868

69+
public final InternalAggregations createSubAggregations() {
70+
return subAggregationsSupplier.get();
71+
}
72+
6973
@Override
7074
public void setUp() throws Exception {
7175
super.setUp();

0 commit comments

Comments
 (0)