Skip to content

Commit 4f4614a

Browse files
authored
Long key bucket ords key iterator (#95809)
Relates to #89437 In order to merge aggregations using LongKeyedBucketOrds, we need a way to align the buckets by key. This adds an iterator to return all keys for a given owning bucket ordinal, in natural sorted order. This can then be used to merge without generating and sorting bucket objects directly. This makes two big assumptions: 1 - the total set of keys is small enough that we don't need to put the key set in a Big Arrays backed data structure 2 - Building the tree set of keys (at reduce time) will not be excessively expensive. We can mitigate 1 if we build a big arrays backed balanced binary tree. That can be done in a follow up PR to this one.
1 parent deffa80 commit 4f4614a

File tree

2 files changed

+255
-15
lines changed

2 files changed

+255
-15
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,32 @@
1414
import org.elasticsearch.core.Releasable;
1515
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
1616

17+
import java.util.Iterator;
1718
import java.util.Locale;
19+
import java.util.NoSuchElementException;
20+
import java.util.TreeSet;
1821

1922
/**
2023
* Maps owning bucket ordinals and long bucket keys to bucket ordinals.
2124
*/
2225
public abstract class LongKeyedBucketOrds implements Releasable {
2326
/**
2427
* Build a {@link LongKeyedBucketOrds} who's values have unknown bounds.
28+
*
29+
* @param cardinality - This should come from the owning aggregation, and is used as an upper bound on the
30+
* owning bucket ordinals.
2531
*/
2632
public static LongKeyedBucketOrds build(BigArrays bigArrays, CardinalityUpperBound cardinality) {
2733
return cardinality.map(estimate -> estimate < 2 ? new FromSingle(bigArrays) : new FromMany(bigArrays));
2834
}
2935

3036
/**
3137
* Build a {@link LongKeyedBucketOrds} who's values have known bounds.
38+
*
39+
* @param cardinality - This should come from the owning aggregation, and is used as an upper bound on the
40+
* owning bucket ordinals.
41+
* @param min - The minimum key value for this aggregation
42+
* @param max - The maximum key value for this aggregation
3243
*/
3344
public static LongKeyedBucketOrds buildForValueRange(BigArrays bigArrays, CardinalityUpperBound cardinality, long min, long max) {
3445
return cardinality.map((int cardinalityUpperBound) -> {
@@ -49,6 +60,8 @@ public static LongKeyedBucketOrds buildForValueRange(BigArrays bigArrays, Cardin
4960
});
5061
}
5162

63+
private TreeSet<Long> keySet = null;
64+
5265
private LongKeyedBucketOrds() {}
5366

5467
/**
@@ -101,6 +114,58 @@ private LongKeyedBucketOrds() {}
101114
*/
102115
public abstract BucketOrdsEnum ordsEnum(long owningBucketOrd);
103116

117+
/**
118+
* Return an iterator for all keys in the given owning bucket, ordered in natural sort order.
119+
* This is suitable for aligning buckets across different instances of an aggregation.
120+
*
121+
* @param owningBucketOrd Only return keys that occured under this owning bucket
122+
* @return a sorted iterator of long key values
123+
*/
124+
public Iterator<Long> keyOrderedIterator(long owningBucketOrd) {
125+
if (keySet == null) {
126+
// TreeSet's contract includes a naturally ordered iterator
127+
keySet = new TreeSet<>();
128+
for (long ord = 0; ord < size(); ord++) {
129+
keySet.add(this.get(ord));
130+
}
131+
}
132+
Iterator<Long> toReturn = new Iterator<>() {
133+
Iterator<Long> wrapped = keySet.iterator();
134+
long filterOrd = owningBucketOrd;
135+
long next;
136+
boolean hasNext = true;
137+
138+
@Override
139+
public boolean hasNext() {
140+
return hasNext;
141+
}
142+
143+
@Override
144+
public Long next() {
145+
if (hasNext == false) {
146+
throw new NoSuchElementException();
147+
}
148+
long toReturn = next;
149+
hasNext = false;
150+
while (wrapped.hasNext()) {
151+
long candidate = wrapped.next();
152+
if (find(filterOrd, candidate) != -1) {
153+
next = candidate;
154+
hasNext = true;
155+
break;
156+
}
157+
}
158+
return toReturn;
159+
}
160+
};
161+
toReturn.next(); // Prime the first actual value
162+
return toReturn;
163+
}
164+
165+
public void close() {
166+
keySet = null;
167+
}
168+
104169
/**
105170
* An iterator for buckets inside a particular {@code owningBucketOrd}.
106171
*/
@@ -223,6 +288,7 @@ public long ord() {
223288

224289
@Override
225290
public void close() {
291+
super.close();
226292
ords.close();
227293
}
228294
}
@@ -320,6 +386,7 @@ public long ord() {
320386

321387
@Override
322388
public void close() {
389+
super.close();
323390
ords.close();
324391
}
325392
}
@@ -455,6 +522,7 @@ public long ord() {
455522

456523
@Override
457524
public void close() {
525+
super.close();
458526
ords.close();
459527
}
460528
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java

Lines changed: 187 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@
1515
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
1616
import org.elasticsearch.test.ESTestCase;
1717

18+
import java.util.HashMap;
1819
import java.util.HashSet;
20+
import java.util.Iterator;
21+
import java.util.Map;
1922
import java.util.Objects;
2023
import java.util.Set;
24+
import java.util.TreeSet;
2125

2226
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.is;
2328

2429
public class LongKeyedBucketOrdsTests extends ESTestCase {
2530
private final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
@@ -32,6 +37,21 @@ public void testSurpriseCollectsFromSingleBucket() {
3237
collectsFromSingleBucketCase(LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.MANY));
3338
}
3439

40+
public void testCollectsFromManyBuckets() {
41+
try (LongKeyedBucketOrds ords = LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.MANY)) {
42+
assertCollectsFromManyBuckets(ords, scaledRandomIntBetween(1, 10000), Long.MIN_VALUE, Long.MAX_VALUE);
43+
}
44+
}
45+
46+
public void testCollectsFromManyBucketsSmall() {
47+
int owningBucketOrds = scaledRandomIntBetween(2, 10000);
48+
long maxValue = randomLongBetween(10000 / owningBucketOrds, 2 << (16 * 3));
49+
CardinalityUpperBound cardinality = CardinalityUpperBound.ONE.multiply(owningBucketOrds);
50+
try (LongKeyedBucketOrds ords = LongKeyedBucketOrds.buildForValueRange(bigArrays, cardinality, 0, maxValue)) {
51+
assertCollectsFromManyBuckets(ords, owningBucketOrds, 0, maxValue);
52+
}
53+
}
54+
3555
private void collectsFromSingleBucketCase(LongKeyedBucketOrds ords) {
3656
try {
3757
// Test a few explicit values
@@ -93,21 +113,6 @@ private void collectsFromSingleBucketCase(LongKeyedBucketOrds ords) {
93113
}
94114
}
95115

96-
public void testCollectsFromManyBuckets() {
97-
try (LongKeyedBucketOrds ords = LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.MANY)) {
98-
assertCollectsFromManyBuckets(ords, scaledRandomIntBetween(1, 10000), Long.MIN_VALUE, Long.MAX_VALUE);
99-
}
100-
}
101-
102-
public void testCollectsFromManyBucketsSmall() {
103-
int owningBucketOrds = scaledRandomIntBetween(2, 10000);
104-
long maxValue = randomLongBetween(10000 / owningBucketOrds, 2 << (16 * 3));
105-
CardinalityUpperBound cardinality = CardinalityUpperBound.ONE.multiply(owningBucketOrds);
106-
try (LongKeyedBucketOrds ords = LongKeyedBucketOrds.buildForValueRange(bigArrays, cardinality, 0, maxValue)) {
107-
assertCollectsFromManyBuckets(ords, owningBucketOrds, 0, maxValue);
108-
}
109-
}
110-
111116
private void assertCollectsFromManyBuckets(LongKeyedBucketOrds ords, int maxAllowedOwningBucketOrd, long minValue, long maxValue) {
112117
// Test a few explicit values
113118
assertThat(ords.add(0, 0), equalTo(0L));
@@ -176,6 +181,173 @@ private void assertCollectsFromManyBuckets(LongKeyedBucketOrds ords, int maxAllo
176181
assertThat(ords.maxOwningBucketOrd(), equalTo(maxOwningBucketOrd));
177182
}
178183

184+
public void testKeyIteratorSingleValue() {
185+
try (LongKeyedBucketOrds ords = LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.ONE)) {
186+
// Test a few explicit values
187+
assertThat(ords.add(0, 0), equalTo(0L));
188+
assertThat(ords.add(0, 1000), equalTo(1L));
189+
assertThat(ords.add(0, 0), equalTo(-1L));
190+
assertThat(ords.add(0, 1000), equalTo(-2L));
191+
assertThat(ords.find(0, 0), equalTo(0L));
192+
assertThat(ords.find(0, 1000), equalTo(1L));
193+
194+
// And some random values
195+
Set<Long> seen = new TreeSet<>();
196+
seen.add(0L);
197+
seen.add(1000L);
198+
assertThat(ords.size(), equalTo(2L));
199+
long[] values = new long[scaledRandomIntBetween(1, 10000)];
200+
for (int i = 0; i < values.length; i++) {
201+
values[i] = randomValueOtherThanMany(seen::contains, ESTestCase::randomLong);
202+
seen.add(values[i]);
203+
}
204+
for (int i = 0; i < values.length; i++) {
205+
assertThat(ords.find(0, values[i]), equalTo(-1L));
206+
assertThat(ords.add(0, values[i]), equalTo(i + 2L));
207+
assertThat(ords.find(0, values[i]), equalTo(i + 2L));
208+
assertThat(ords.size(), equalTo(i + 3L));
209+
}
210+
211+
// For the single value case, the sorted iterator should exactly equal the values tree set iterator
212+
Iterator<Long> expected = seen.iterator();
213+
Iterator<Long> actual = ords.keyOrderedIterator(0);
214+
assertNotSame(expected, actual);
215+
while (expected.hasNext()) {
216+
assertThat(actual.hasNext(), is(true));
217+
long actualNext = actual.next();
218+
long expectedNext = expected.next();
219+
assertThat(actualNext, equalTo(expectedNext));
220+
}
221+
assertThat(actual.hasNext(), is(false));
222+
}
223+
}
224+
225+
public void testKeyIteratormanyBuckets() {
226+
long maxAllowedOwningBucketOrd = scaledRandomIntBetween(1, 10000);
227+
long minValue = Long.MIN_VALUE;
228+
long maxValue = Long.MAX_VALUE;
229+
try (LongKeyedBucketOrds ords = LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.MANY)) {
230+
Map<Long, TreeSet<Long>> expected = new HashMap<>();
231+
// Test a few explicit values
232+
assertThat(ords.add(0, 0), equalTo(0L));
233+
assertThat(ords.add(1, 0), equalTo(1L));
234+
assertThat(ords.add(0, 0), equalTo(-1L));
235+
assertThat(ords.add(1, 0), equalTo(-2L));
236+
assertThat(ords.size(), equalTo(2L));
237+
assertThat(ords.find(0, 0), equalTo(0L));
238+
assertThat(ords.find(1, 0), equalTo(1L));
239+
240+
Set<OwningBucketOrdAndValue> seen = new HashSet<>();
241+
seen.add(new OwningBucketOrdAndValue(0, 0));
242+
seen.add(new OwningBucketOrdAndValue(1, 0));
243+
244+
expected.put(0L, new TreeSet<>());
245+
expected.get(0L).add(0L);
246+
expected.put(1L, new TreeSet<>());
247+
expected.get(1L).add(0L);
248+
249+
OwningBucketOrdAndValue[] values = new OwningBucketOrdAndValue[scaledRandomIntBetween(1, 10000)];
250+
for (int i = 0; i < values.length; i++) {
251+
values[i] = randomValueOtherThanMany(
252+
seen::contains,
253+
() -> new OwningBucketOrdAndValue(
254+
randomLongBetween(0, maxAllowedOwningBucketOrd),
255+
randomLongBetween(minValue, maxValue)
256+
)
257+
);
258+
seen.add(values[i]);
259+
if (expected.containsKey(values[i].owningBucketOrd) == false) {
260+
expected.put(values[i].owningBucketOrd, new TreeSet<>());
261+
}
262+
expected.get(values[i].owningBucketOrd).add(values[i].value);
263+
}
264+
for (int i = 0; i < values.length; i++) {
265+
assertThat(ords.find(values[i].owningBucketOrd, values[i].value), equalTo(-1L));
266+
assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L));
267+
assertThat(ords.find(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L));
268+
assertThat(ords.size(), equalTo(i + 3L));
269+
}
270+
271+
for (Long owningBucketOrd : expected.keySet()) {
272+
Iterator<Long> expectedIterator = expected.get(owningBucketOrd).iterator();
273+
Iterator<Long> actualIterator = ords.keyOrderedIterator(owningBucketOrd);
274+
275+
assertNotSame(expectedIterator, actualIterator);
276+
while (expectedIterator.hasNext()) {
277+
assertThat(actualIterator.hasNext(), is(true));
278+
long actualNext = actualIterator.next();
279+
long expectedNext = expectedIterator.next();
280+
assertThat(actualNext, equalTo(expectedNext));
281+
}
282+
assertThat(actualIterator.hasNext(), is(false));
283+
}
284+
}
285+
}
286+
287+
public void testKeyIteratorManyBucketsSmall() {
288+
int maxAllowedOwningBucketOrd = scaledRandomIntBetween(2, 10000);
289+
long minValue = 0;
290+
long maxValue = randomLongBetween(10000 / maxAllowedOwningBucketOrd, 2 << (16 * 3));
291+
CardinalityUpperBound cardinality = CardinalityUpperBound.ONE.multiply(maxAllowedOwningBucketOrd);
292+
try (LongKeyedBucketOrds ords = LongKeyedBucketOrds.buildForValueRange(bigArrays, cardinality, minValue, maxValue)) {
293+
assertTrue(ords instanceof LongKeyedBucketOrds.FromManySmall);
294+
Map<Long, TreeSet<Long>> expected = new HashMap<>();
295+
// Test a few explicit values
296+
assertThat(ords.add(0, 0), equalTo(0L));
297+
assertThat(ords.add(1, 0), equalTo(1L));
298+
assertThat(ords.add(0, 0), equalTo(-1L));
299+
assertThat(ords.add(1, 0), equalTo(-2L));
300+
assertThat(ords.size(), equalTo(2L));
301+
assertThat(ords.find(0, 0), equalTo(0L));
302+
assertThat(ords.find(1, 0), equalTo(1L));
303+
304+
Set<OwningBucketOrdAndValue> seen = new HashSet<>();
305+
seen.add(new OwningBucketOrdAndValue(0, 0));
306+
seen.add(new OwningBucketOrdAndValue(1, 0));
307+
308+
expected.put(0L, new TreeSet<>());
309+
expected.get(0L).add(0L);
310+
expected.put(1L, new TreeSet<>());
311+
expected.get(1L).add(0L);
312+
313+
OwningBucketOrdAndValue[] values = new OwningBucketOrdAndValue[scaledRandomIntBetween(1, 10000)];
314+
for (int i = 0; i < values.length; i++) {
315+
values[i] = randomValueOtherThanMany(
316+
seen::contains,
317+
() -> new OwningBucketOrdAndValue(
318+
randomLongBetween(0, maxAllowedOwningBucketOrd),
319+
randomLongBetween(minValue, maxValue)
320+
)
321+
);
322+
seen.add(values[i]);
323+
if (expected.containsKey(values[i].owningBucketOrd) == false) {
324+
expected.put(values[i].owningBucketOrd, new TreeSet<>());
325+
}
326+
expected.get(values[i].owningBucketOrd).add(values[i].value);
327+
}
328+
for (int i = 0; i < values.length; i++) {
329+
assertThat(ords.find(values[i].owningBucketOrd, values[i].value), equalTo(-1L));
330+
assertThat(ords.add(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L));
331+
assertThat(ords.find(values[i].owningBucketOrd, values[i].value), equalTo(i + 2L));
332+
assertThat(ords.size(), equalTo(i + 3L));
333+
}
334+
335+
for (Long owningBucketOrd : expected.keySet()) {
336+
Iterator<Long> expectedIterator = expected.get(owningBucketOrd).iterator();
337+
Iterator<Long> actualIterator = ords.keyOrderedIterator(owningBucketOrd);
338+
339+
assertNotSame(expectedIterator, actualIterator);
340+
while (expectedIterator.hasNext()) {
341+
assertThat(actualIterator.hasNext(), is(true));
342+
long actualNext = actualIterator.next();
343+
long expectedNext = expectedIterator.next();
344+
assertThat(actualNext, equalTo(expectedNext));
345+
}
346+
assertThat(actualIterator.hasNext(), is(false));
347+
}
348+
}
349+
}
350+
179351
private class OwningBucketOrdAndValue {
180352
private final long owningBucketOrd;
181353
private final long value;

0 commit comments

Comments
 (0)