Skip to content

Commit 1c171b7

Browse files
Adding logic for histogram aggregation using skiplist (opensearch-project#19130)
--------- Signed-off-by: Ankit Jain <[email protected]> Signed-off-by: Asim Mahmood <[email protected]> Signed-off-by: Ankit Jain <[email protected]> Co-authored-by: Asim Mahmood <[email protected]>
1 parent 47df4bd commit 1c171b7

File tree

5 files changed

+251
-3
lines changed

5 files changed

+251
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2727
- Add new extensible method to DocRequest to specify type ([#19313](https://github.com/opensearch-project/OpenSearch/pull/19313))
2828
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
2929
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))
30+
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
3031
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))
3132

3233
### Changed

server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void collect(int doc, long bucket) throws IOException {
119119
public abstract void collect(int doc, long owningBucketOrd) throws IOException;
120120

121121
@Override
122-
public final void collect(int doc) throws IOException {
122+
public void collect(int doc) throws IOException {
123123
collect(doc, 0);
124124
}
125125

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

Lines changed: 145 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,15 @@
3131

3232
package org.opensearch.search.aggregations.bucket.histogram;
3333

34+
import org.apache.logging.log4j.LogManager;
35+
import org.apache.logging.log4j.Logger;
3436
import org.apache.lucene.index.DocValues;
37+
import org.apache.lucene.index.DocValuesSkipper;
3538
import org.apache.lucene.index.LeafReaderContext;
3639
import org.apache.lucene.index.NumericDocValues;
3740
import org.apache.lucene.index.SortedNumericDocValues;
41+
import org.apache.lucene.search.DocIdStream;
42+
import org.apache.lucene.search.Scorable;
3843
import org.apache.lucene.search.ScoreMode;
3944
import org.apache.lucene.util.CollectionUtil;
4045
import org.opensearch.common.Nullable;
@@ -88,6 +93,8 @@
8893
* @opensearch.internal
8994
*/
9095
class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator, StarTreePreComputeCollector {
96+
private static final Logger logger = LogManager.getLogger(DateHistogramAggregator.class);
97+
9198
private final ValuesSource.Numeric valuesSource;
9299
private final DocValueFormat formatter;
93100
private final Rounding rounding;
@@ -105,7 +112,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
105112
private boolean starTreeDateRoundingRequired = true;
106113

107114
private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
108-
public final String fieldName;
115+
private final String fieldName;
116+
private final boolean fieldIndexSort;
109117

110118
DateHistogramAggregator(
111119
String name,
@@ -173,6 +181,7 @@ protected Function<Long, Long> bucketOrdProducer() {
173181
this.fieldName = (valuesSource instanceof ValuesSource.Numeric.FieldData)
174182
? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName()
175183
: null;
184+
this.fieldIndexSort = this.fieldName == null ? false : context.getQueryShardContext().indexSortedOnField(fieldName);
176185
this.starTreeDateDimension = (context.getQueryShardContext().getStarTreeQueryContext() != null)
177186
? fetchStarTreeCalendarUnit()
178187
: null;
@@ -209,9 +218,22 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
209218
return LeafBucketCollector.NO_OP_COLLECTOR;
210219
}
211220

221+
DocValuesSkipper skipper = null;
222+
if (this.fieldName != null) {
223+
skipper = ctx.reader().getDocValuesSkipper(this.fieldName);
224+
}
212225
final SortedNumericDocValues values = valuesSource.longValues(ctx);
213226
final NumericDocValues singleton = DocValues.unwrapSingleton(values);
214227

228+
// If no subaggregations and index sorted on given field, we can use skip list based collector
229+
logger.trace("Index sort field found: {}, skipper: {}", fieldIndexSort, skipper);
230+
if (fieldIndexSort && skipper != null && singleton != null) {
231+
// TODO: add hard bounds support
232+
if (hardBounds != null || sub == null || sub == LeafBucketCollector.NO_OP_COLLECTOR) {
233+
return new HistogramSkiplistLeafCollector(singleton, skipper, preparedRounding, bucketOrds, this::incrementBucketDocCount);
234+
}
235+
}
236+
215237
if (singleton != null) {
216238
// Optimized path for single-valued fields
217239
return new LeafBucketCollectorBase(sub, values) {
@@ -397,4 +419,126 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
397419
return 1.0;
398420
}
399421
}
422+
423+
private static class HistogramSkiplistLeafCollector extends LeafBucketCollector {
424+
425+
private final NumericDocValues values;
426+
private final DocValuesSkipper skipper;
427+
private final Rounding.Prepared preparedRounding;
428+
private final LongKeyedBucketOrds bucketOrds;
429+
private final BiConsumer<Long, Long> incrementDocCount;
430+
431+
/**
432+
* Max doc ID (inclusive) up to which all docs values may map to the same bucket.
433+
*/
434+
private int upToInclusive = -1;
435+
436+
/**
437+
* Whether all docs up to {@link #upToInclusive} values map to the same bucket.
438+
*/
439+
private boolean upToSameBucket;
440+
441+
/**
442+
* Index in bucketOrds for docs up to {@link #upToInclusive}.
443+
*/
444+
private long upToBucketIndex;
445+
446+
HistogramSkiplistLeafCollector(
447+
NumericDocValues values,
448+
DocValuesSkipper skipper,
449+
Rounding.Prepared preparedRounding,
450+
LongKeyedBucketOrds bucketOrds,
451+
BiConsumer<Long, Long> incrementDocCount
452+
) {
453+
this.values = values;
454+
this.skipper = skipper;
455+
this.preparedRounding = preparedRounding;
456+
this.bucketOrds = bucketOrds;
457+
this.incrementDocCount = incrementDocCount;
458+
}
459+
460+
@Override
461+
public void setScorer(Scorable scorer) throws IOException {}
462+
463+
private void advanceSkipper(int doc) throws IOException {
464+
if (doc > skipper.maxDocID(0)) {
465+
skipper.advance(doc);
466+
}
467+
upToSameBucket = false;
468+
469+
if (skipper.minDocID(0) > doc) {
470+
// Corner case which happens if `doc` doesn't have a value and is between two intervals of
471+
// the doc-value skip index.
472+
upToInclusive = skipper.minDocID(0) - 1;
473+
return;
474+
}
475+
476+
upToInclusive = skipper.maxDocID(0);
477+
478+
// Now find the highest level where all docs map to the same bucket.
479+
for (int level = 0; level < skipper.numLevels(); ++level) {
480+
int totalDocsAtLevel = skipper.maxDocID(level) - skipper.minDocID(level) + 1;
481+
long minBucket = preparedRounding.round(skipper.minValue(level));
482+
long maxBucket = preparedRounding.round(skipper.maxValue(level));
483+
484+
if (skipper.docCount(level) == totalDocsAtLevel && minBucket == maxBucket) {
485+
// All docs at this level have a value, and all values map to the same bucket.
486+
upToInclusive = skipper.maxDocID(level);
487+
upToSameBucket = true;
488+
upToBucketIndex = bucketOrds.add(0, maxBucket);
489+
if (upToBucketIndex < 0) {
490+
upToBucketIndex = -1 - upToBucketIndex;
491+
}
492+
} else {
493+
break;
494+
}
495+
}
496+
}
497+
498+
@Override
499+
public void collect(int doc, long owningBucketOrd) throws IOException {
500+
collect(doc);
501+
}
502+
503+
@Override
504+
public void collect(int doc) throws IOException {
505+
if (doc > upToInclusive) {
506+
advanceSkipper(doc);
507+
}
508+
509+
if (upToSameBucket) {
510+
incrementDocCount.accept(upToBucketIndex, 1L);
511+
} else if (values.advanceExact(doc)) {
512+
final long value = values.longValue();
513+
long bucketIndex = bucketOrds.add(0, preparedRounding.round(value));
514+
if (bucketIndex < 0) {
515+
bucketIndex = -1 - bucketIndex;
516+
}
517+
incrementDocCount.accept(bucketIndex, 1L);
518+
}
519+
}
520+
521+
@Override
522+
public void collect(DocIdStream stream) throws IOException {
523+
for (;;) {
524+
int upToExclusive = upToInclusive + 1;
525+
if (upToExclusive < 0) { // overflow
526+
upToExclusive = Integer.MAX_VALUE;
527+
}
528+
529+
if (upToSameBucket) {
530+
long count = stream.count(upToExclusive);
531+
incrementDocCount.accept(upToBucketIndex, count);
532+
} else {
533+
stream.forEach(upToExclusive, this::collect);
534+
}
535+
536+
if (stream.mayHaveRemaining()) {
537+
advanceSkipper(upToExclusive);
538+
} else {
539+
break;
540+
}
541+
}
542+
}
543+
}
400544
}

server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,28 @@
4040
import org.apache.lucene.index.IndexReader;
4141
import org.apache.lucene.index.IndexWriter;
4242
import org.apache.lucene.index.IndexWriterConfig;
43+
import org.apache.lucene.index.NoMergePolicy;
4344
import org.apache.lucene.search.IndexSearcher;
4445
import org.apache.lucene.search.MatchAllDocsQuery;
4546
import org.apache.lucene.search.MatchNoDocsQuery;
4647
import org.apache.lucene.search.Query;
48+
import org.apache.lucene.search.Sort;
49+
import org.apache.lucene.search.SortField;
4750
import org.apache.lucene.store.Directory;
4851
import org.apache.lucene.tests.index.RandomIndexWriter;
4952
import org.apache.lucene.tests.util.TestUtil;
53+
import org.opensearch.Version;
54+
import org.opensearch.cluster.metadata.IndexMetadata;
55+
import org.opensearch.common.settings.Settings;
5056
import org.opensearch.common.time.DateFormatters;
5157
import org.opensearch.core.common.breaker.CircuitBreaker;
5258
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
59+
import org.opensearch.index.IndexSettings;
60+
import org.opensearch.index.fielddata.IndexNumericFieldData;
5361
import org.opensearch.index.mapper.DateFieldMapper;
5462
import org.opensearch.index.mapper.DocCountFieldMapper;
5563
import org.opensearch.index.mapper.MappedFieldType;
64+
import org.opensearch.search.MultiValueMode;
5665
import org.opensearch.search.aggregations.AggregationBuilder;
5766
import org.opensearch.search.aggregations.BucketOrder;
5867
import org.opensearch.search.aggregations.InternalAggregation;
@@ -246,6 +255,100 @@ public void testAsSubAgg() throws IOException {
246255
});
247256
}
248257

258+
public void testSkiplistWithSingleValueDates() throws IOException {
259+
// Create index settings with an index sort.
260+
Settings settings = Settings.builder()
261+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
262+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
263+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
264+
.putList("index.sort.field", AGGREGABLE_DATE)
265+
.build();
266+
267+
IndexMetadata indexMetadata = new IndexMetadata.Builder("index").settings(settings).build();
268+
IndexSettings indexSettings = new IndexSettings(indexMetadata, settings);
269+
270+
MappedFieldType fieldType = new DateFieldMapper.DateFieldType(AGGREGABLE_DATE);
271+
IndexNumericFieldData fieldData = (IndexNumericFieldData) fieldType.fielddataBuilder("index", () -> {
272+
throw new UnsupportedOperationException();
273+
}).build(null, null);
274+
SortField sortField = fieldData.sortField(null, MultiValueMode.MIN, null, false);
275+
try (Directory directory = newDirectory()) {
276+
IndexWriterConfig config = newIndexWriterConfig();
277+
config.setMergePolicy(NoMergePolicy.INSTANCE);
278+
config.setIndexSort(new Sort(sortField));
279+
String filterField = "type";
280+
try (IndexWriter indexWriter = new IndexWriter(directory, config)) {
281+
282+
// First commit - 5 dates with type 1
283+
for (int i = 0; i < 5; i++) {
284+
Document doc = new Document();
285+
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
286+
.toInstant()
287+
.toEpochMilli();
288+
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
289+
doc.add(new LongPoint(filterField, 1));
290+
indexWriter.addDocument(doc);
291+
}
292+
indexWriter.commit();
293+
294+
// Second commit - 3 more dates with type 2, skiplist
295+
for (int i = 5; i < 8; i++) {
296+
Document doc = new Document();
297+
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
298+
.toInstant()
299+
.toEpochMilli();
300+
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
301+
doc.add(new LongPoint(filterField, 2));
302+
indexWriter.addDocument(doc);
303+
}
304+
indexWriter.commit();
305+
306+
// Third commit - 3 more dates with type 2
307+
for (int i = 8; i < 10; i++) {
308+
Document doc = new Document();
309+
long timestamp = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(DATASET.get(i)))
310+
.toInstant()
311+
.toEpochMilli();
312+
doc.add(SortedNumericDocValuesField.indexedField(AGGREGABLE_DATE, timestamp));
313+
doc.add(new LongPoint(filterField, 2));
314+
indexWriter.addDocument(doc);
315+
}
316+
indexWriter.commit();
317+
}
318+
319+
try (IndexReader indexReader = DirectoryReader.open(directory)) {
320+
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
321+
322+
DateHistogramAggregationBuilder aggregationBuilder = new DateHistogramAggregationBuilder("test").field(AGGREGABLE_DATE)
323+
.calendarInterval(DateHistogramInterval.YEAR);
324+
325+
Query query = LongPoint.newExactQuery(filterField, 2);
326+
327+
InternalDateHistogram histogram = searchAndReduce(
328+
indexSettings,
329+
indexSearcher,
330+
query,
331+
aggregationBuilder,
332+
1000,
333+
false,
334+
fieldType
335+
);
336+
337+
assertEquals(3, histogram.getBuckets().size()); // 2015, 2016, 2017 (only type 2 docs)
338+
339+
assertEquals("2015-01-01T00:00:00.000Z", histogram.getBuckets().get(0).getKeyAsString());
340+
assertEquals(3, histogram.getBuckets().get(0).getDocCount());
341+
342+
assertEquals("2016-01-01T00:00:00.000Z", histogram.getBuckets().get(1).getKeyAsString());
343+
assertEquals(1, histogram.getBuckets().get(1).getDocCount());
344+
345+
assertEquals("2017-01-01T00:00:00.000Z", histogram.getBuckets().get(2).getKeyAsString());
346+
assertEquals(1, histogram.getBuckets().get(2).getDocCount());
347+
}
348+
}
349+
350+
}
351+
249352
public void testNoDocsDeprecatedInterval() throws IOException {
250353
Query query = new MatchNoDocsQuery();
251354
List<String> dates = Collections.emptyList();

test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
702702
maxBucket,
703703
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
704704
);
705-
C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes);
705+
C root = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes);
706706

707707
if (shardFanOut && searcher.getIndexReader().leaves().size() > 0) {
708708
assertThat(ctx, instanceOf(CompositeReaderContext.class));

0 commit comments

Comments
 (0)