Skip to content

Commit da18cc6

Browse files
Combining filter rewrite and skip list to optimize sub aggregation (#19573)
Signed-off-by: Ankit Jain <[email protected]> Signed-off-by: Asim Mahmood <[email protected]> Co-authored-by: Asim Mahmood <[email protected]>
1 parent 35f97a2 commit da18cc6

File tree

6 files changed

+407
-156
lines changed

6 files changed

+407
-156
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3636
- Support dynamic consumer configuration update in pull-based ingestion ([#19963](https://github.com/opensearch-project/OpenSearch/pull/19963))
3737

3838
### Changed
39+
- Combining filter rewrite and skip list to optimize sub aggregation([#19573](https://github.com/opensearch-project/OpenSearch/pull/19573))
3940
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
4041
- Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551))
4142
- Omit maxScoreCollector in SimpleTopDocsCollectorContext when concurrent segment search enabled ([#19584](https://github.com/opensearch-project/OpenSearch/pull/19584))

server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public abstract class AggregatorBase extends Aggregator {
7373
private Map<String, Aggregator> subAggregatorbyName;
7474
private final CircuitBreakerService breakerService;
7575
private long requestBytesUsed;
76+
protected LeafCollectionMode leafCollectorMode = LeafCollectionMode.NORMAL;
7677

7778
/**
7879
* Constructs a new Aggregator.
@@ -236,6 +237,23 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws
236237
return false;
237238
}
238239

240+
/**
241+
* To be used in conjunction with <code>tryPrecomputeAggregationForLeaf()</code>
242+
* or <code>getLeafCollector</code> method.
243+
*/
244+
public LeafCollectionMode getLeafCollectorMode() {
245+
return leafCollectorMode;
246+
}
247+
248+
/**
249+
* To be used in conjunction with <code>tryPrecomputeAggregationForLeaf()</code>
250+
* or <code>getLeafCollector</code> method.
251+
*/
252+
public enum LeafCollectionMode {
253+
NORMAL,
254+
FILTER_REWRITE
255+
}
256+
239257
@Override
240258
public final void preCollection() throws IOException {
241259
List<BucketCollector> collectors = Arrays.asList(subAggregators);
@@ -343,4 +361,5 @@ protected void checkCancelled() {
343361
throw new TaskCancelledException("The query has been cancelled");
344362
}
345363
}
364+
346365
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations.bucket;
10+
11+
import org.apache.lucene.index.DocValuesSkipper;
12+
import org.apache.lucene.index.NumericDocValues;
13+
import org.apache.lucene.search.DocIdStream;
14+
import org.apache.lucene.search.Scorable;
15+
import org.opensearch.common.Rounding;
16+
import org.opensearch.search.aggregations.LeafBucketCollector;
17+
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
18+
19+
import java.io.IOException;
20+
21+
/**
22+
* Histogram collection logic using skip list.
23+
*
24+
* @opensearch.internal
25+
*/
26+
public class HistogramSkiplistLeafCollector extends LeafBucketCollector {
27+
28+
private final NumericDocValues values;
29+
private final DocValuesSkipper skipper;
30+
private final Rounding.Prepared preparedRounding;
31+
private final LongKeyedBucketOrds bucketOrds;
32+
private final LeafBucketCollector sub;
33+
private final BucketsAggregator aggregator;
34+
35+
/**
36+
* Max doc ID (inclusive) up to which all docs values may map to the same
37+
* bucket.
38+
*/
39+
private int upToInclusive = -1;
40+
41+
/**
42+
* Whether all docs up to {@link #upToInclusive} values map to the same bucket.
43+
*/
44+
private boolean upToSameBucket;
45+
46+
/**
47+
* Index in bucketOrds for docs up to {@link #upToInclusive}.
48+
*/
49+
private long upToBucketIndex;
50+
51+
public HistogramSkiplistLeafCollector(
52+
NumericDocValues values,
53+
DocValuesSkipper skipper,
54+
Rounding.Prepared preparedRounding,
55+
LongKeyedBucketOrds bucketOrds,
56+
LeafBucketCollector sub,
57+
BucketsAggregator aggregator
58+
) {
59+
this.values = values;
60+
this.skipper = skipper;
61+
this.preparedRounding = preparedRounding;
62+
this.bucketOrds = bucketOrds;
63+
this.sub = sub;
64+
this.aggregator = aggregator;
65+
}
66+
67+
@Override
68+
public void setScorer(Scorable scorer) throws IOException {
69+
if (sub != null) {
70+
sub.setScorer(scorer);
71+
}
72+
}
73+
74+
private void advanceSkipper(int doc, long owningBucketOrd) throws IOException {
75+
if (doc > skipper.maxDocID(0)) {
76+
skipper.advance(doc);
77+
}
78+
upToSameBucket = false;
79+
80+
if (skipper.minDocID(0) > doc) {
81+
// Corner case which happens if `doc` doesn't have a value and is between two
82+
// intervals of
83+
// the doc-value skip index.
84+
upToInclusive = skipper.minDocID(0) - 1;
85+
return;
86+
}
87+
88+
upToInclusive = skipper.maxDocID(0);
89+
90+
// Now find the highest level where all docs map to the same bucket.
91+
for (int level = 0; level < skipper.numLevels(); ++level) {
92+
int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1;
93+
long minBucket = preparedRounding.round(skipper.minValue(level));
94+
long maxBucket = preparedRounding.round(skipper.maxValue(level));
95+
96+
if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) {
97+
// All docs at this level have a value, and all values map to the same bucket.
98+
upToInclusive = skipper.maxDocID(level);
99+
upToSameBucket = true;
100+
upToBucketIndex = bucketOrds.add(owningBucketOrd, maxBucket);
101+
if (upToBucketIndex < 0) {
102+
upToBucketIndex = -1 - upToBucketIndex;
103+
}
104+
} else {
105+
break;
106+
}
107+
}
108+
}
109+
110+
@Override
111+
public void collect(int doc, long owningBucketOrd) throws IOException {
112+
if (doc > upToInclusive) {
113+
advanceSkipper(doc, owningBucketOrd);
114+
}
115+
116+
if (upToSameBucket) {
117+
aggregator.incrementBucketDocCount(upToBucketIndex, 1L);
118+
sub.collect(doc, upToBucketIndex);
119+
} else if (values.advanceExact(doc)) {
120+
final long value = values.longValue();
121+
long bucketIndex = bucketOrds.add(owningBucketOrd, preparedRounding.round(value));
122+
if (bucketIndex < 0) {
123+
bucketIndex = -1 - bucketIndex;
124+
aggregator.collectExistingBucket(sub, doc, bucketIndex);
125+
} else {
126+
aggregator.collectBucket(sub, doc, bucketIndex);
127+
}
128+
}
129+
}
130+
131+
@Override
132+
public void collect(DocIdStream stream) throws IOException {
133+
// This will only be called if its the top agg
134+
collect(stream, 0);
135+
}
136+
137+
@Override
138+
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
139+
// This will only be called if its the sub aggregation
140+
for (;;) {
141+
int upToExclusive = upToInclusive + 1;
142+
if (upToExclusive < 0) { // overflow
143+
upToExclusive = Integer.MAX_VALUE;
144+
}
145+
146+
if (upToSameBucket) {
147+
if (sub == NO_OP_COLLECTOR) {
148+
// stream.count maybe faster when we don't need to handle sub-aggs
149+
long count = stream.count(upToExclusive);
150+
aggregator.incrementBucketDocCount(upToBucketIndex, count);
151+
} else {
152+
final int[] count = { 0 };
153+
stream.forEach(upToExclusive, doc -> {
154+
sub.collect(doc, upToBucketIndex);
155+
count[0]++;
156+
});
157+
aggregator.incrementBucketDocCount(upToBucketIndex, count[0]);
158+
}
159+
} else {
160+
stream.forEach(upToExclusive, doc -> collect(doc, owningBucketOrd));
161+
}
162+
163+
if (stream.mayHaveRemaining()) {
164+
advanceSkipper(upToExclusive, owningBucketOrd);
165+
} else {
166+
break;
167+
}
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)