Skip to content

Commit 8c244c0

Browse files
Fix the regression of terms agg optimization (#20623)
* Fix the regression of terms agg optimization Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Change log Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * add code coverage Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> --------- Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent db0a16d commit 8c244c0

File tree

10 files changed

+159
-2
lines changed

10 files changed

+159
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- Fix WLM workload group creation failing due to updated_at clock skew ([#20486](https://github.com/opensearch-project/OpenSearch/pull/20486))
2727
- Fix SLF4J component error ([#20587](https://github.com/opensearch-project/OpenSearch/pull/20587))
2828
- Service does not start on Windows with OpenJDK ([#20615](https://github.com/opensearch-project/OpenSearch/pull/20615))
29+
- Fix the regression of terms agg optimization at high cardinality ([#20623](https://github.com/opensearch-project/OpenSearch/pull/20623))
2930

3031
### Dependencies
3132
- Bump `ch.qos.logback:logback-core` and `ch.qos.logback:logback-classic` from 1.5.24 to 1.5.27 ([#20525](https://github.com/opensearch-project/OpenSearch/pull/20525))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,7 @@ public void apply(Settings value, Settings current, Settings previous) {
594594
SearchService.SEARCH_MAX_QUERY_STRING_LENGTH,
595595
SearchService.SEARCH_MAX_QUERY_STRING_LENGTH_MONITOR_ONLY,
596596
SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD,
597+
SearchService.TERMS_AGGREGATION_MAX_PRECOMPUTE_CARDINALITY,
597598
CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_ENABLED,
598599
CardinalityAggregator.CARDINALITY_AGGREGATION_HYBRID_COLLECTOR_MEMORY_THRESHOLD,
599600
SearchService.KEYWORD_INDEX_OR_DOC_VALUES_ENABLED,

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ final class DefaultSearchContext extends SearchContext {
230230
private final int maxAggRewriteFilters;
231231
private final int filterRewriteSegmentThreshold;
232232
private final int cardinalityAggregationPruningThreshold;
233+
private final long termsAggregationMaxPrecomputeCardinality;
233234
private final CardinalityAggregationContext cardinalityAggregationContext;
234235
private final int bucketSelectionStrategyFactor;
235236
private final boolean keywordIndexOrDocValuesEnabled;
@@ -298,6 +299,7 @@ final class DefaultSearchContext extends SearchContext {
298299
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
299300
this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold();
300301
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
302+
this.termsAggregationMaxPrecomputeCardinality = evaluateTermsAggregationMaxPrecomputeCardinality();
301303
this.cardinalityAggregationContext = evaluateCardinalityAggregationContext();
302304
this.bucketSelectionStrategyFactor = evaluateBucketSelectionStrategyFactor();
303305
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
@@ -1260,6 +1262,11 @@ public int cardinalityAggregationPruningThreshold() {
12601262
return cardinalityAggregationPruningThreshold;
12611263
}
12621264

1265+
@Override
1266+
public long termsAggregationMaxPrecomputeCardinality() {
1267+
return termsAggregationMaxPrecomputeCardinality;
1268+
}
1269+
12631270
@Override
12641271
public CardinalityAggregationContext cardinalityAggregationContext() {
12651272
return cardinalityAggregationContext;
@@ -1282,6 +1289,13 @@ private int evaluateCardinalityAggregationPruningThreshold() {
12821289
return 0;
12831290
}
12841291

1292+
private long evaluateTermsAggregationMaxPrecomputeCardinality() {
1293+
if (clusterService != null) {
1294+
return clusterService.getClusterSettings().get(SearchService.TERMS_AGGREGATION_MAX_PRECOMPUTE_CARDINALITY);
1295+
}
1296+
return 30_000L;
1297+
}
1298+
12851299
private CardinalityAggregationContext evaluateCardinalityAggregationContext() {
12861300
if (clusterService != null) {
12871301
boolean hybridCollectorEnabled = clusterService.getClusterSettings()

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
442442
Property.NodeScope
443443
);
444444

445+
public static final Setting<Long> TERMS_AGGREGATION_MAX_PRECOMPUTE_CARDINALITY = Setting.longSetting(
446+
"search.aggregations.terms.max_precompute_cardinality",
447+
30_000L,
448+
0L,
449+
Property.Dynamic,
450+
Property.NodeScope
451+
);
452+
445453
public static final int DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR = 5;
446454
public static final Setting<Integer> BUCKET_SELECTION_STRATEGY_FACTOR_SETTING = Setting.intSetting(
447455
"search.aggregation.bucket_selection_strategy_factor",

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ boolean tryCollectFromTermFrequencies(LeafReaderContext ctx, BiConsumer<Long, In
196196
return false;
197197
}
198198

199+
long termCount = segmentTerms.size();
200+
if (termCount == -1 || termCount > context.termsAggregationMaxPrecomputeCardinality()) {
201+
return false;
202+
}
203+
199204
NumericDocValues docCountValues = DocValues.getNumeric(ctx.reader(), DocCountFieldMapper.NAME);
200205
if (docCountValues.nextDoc() != NO_MORE_DOCS) {
201206
// This segment has at least one document with the _doc_count field.

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -776,8 +776,9 @@ private boolean shouldDisableStreamingForOrdinals(SearchContext searchContext, W
776776
}
777777

778778
// Check 2: Match-all query with the majority of docs in clean segments
779-
// Traditional aggregator can use term frequency optimization for these segments
780-
if (isMatchAllQuery(searchContext.query())) {
779+
// and cardinality within the precompute threshold.
780+
// Traditional aggregator can use term frequency optimization for these segments.
781+
if (isMatchAllQuery(searchContext.query()) && maxCardinality <= searchContext.termsAggregationMaxPrecomputeCardinality()) {
781782
double cleanRatio = totalDocs > 0 ? (double) docsInCleanSegments / totalDocs : 0;
782783
return cleanRatio > 0.8;
783784
}

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,11 @@ public int cardinalityAggregationPruningThreshold() {
560560
return 0;
561561
}
562562

563+
@ExperimentalApi
564+
public long termsAggregationMaxPrecomputeCardinality() {
565+
return 30_000L;
566+
}
567+
563568
public CardinalityAggregationContext cardinalityAggregationContext() {
564569
return new CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100);
565570
}

server/src/test/java/org/opensearch/search/aggregations/FactoryStreamingCostEstimationTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,37 @@ public void testTermsFactoryStreamableNonMatchAllQuery() throws IOException {
667667
}
668668
}
669669

670+
/**
671+
* Test that match-all query with clean segments but cardinality above the precompute threshold IS streamable.
672+
* When cardinality exceeds the threshold, Check 2 is skipped because tryCollectFromTermFrequencies
673+
* would bail out on high cardinality anyway.
674+
*/
675+
public void testTermsFactoryStreamableMatchAllHighCardinality() throws IOException {
676+
try (Directory directory = newDirectory()) {
677+
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig())) {
678+
// Create index with cardinality above the default threshold (30,000)
679+
for (int i = 0; i < 50000; i++) {
680+
Document doc = new Document();
681+
doc.add(new SortedSetDocValuesField("category", new BytesRef("cat_" + i)));
682+
writer.addDocument(doc);
683+
}
684+
685+
try (IndexReader reader = DirectoryReader.open(writer)) {
686+
IndexSearcher searcher = newIndexSearcher(reader);
687+
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("category");
688+
689+
TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field("category").size(10);
690+
691+
// Match-all with clean segments but high cardinality - should be streamable
692+
FactoryAndContext result = createAggregatorFactoryWithQuery(termsBuilder, searcher, new MatchAllDocsQuery(), fieldType);
693+
StreamingCostMetrics metrics = ((StreamingCostEstimable) result.factory).estimateStreamingCost(result.searchContext);
694+
695+
assertTrue("Match-all with high cardinality should be streamable", metrics.streamable());
696+
}
697+
}
698+
}
699+
}
700+
670701
// ========================================
671702
// Helper methods
672703
// ========================================

server/src/test/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorTests.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,96 @@ public void testSimpleAggregationLowCardinality() throws Exception {
329329
testSimple(ADD_SORTED_SET_FIELD_INDEXED, false, true, true, TermsAggregatorFactory.ExecutionMode.GLOBAL_ORDINALS, 4);
330330
}
331331

332+
/**
333+
* When termsAggregationMaxPrecomputeCardinality is set to 0, tryCollectFromTermFrequencies should bail out
334+
* even when all other conditions are met (indexed fields, no deletions, no _doc_count).
335+
* This verifies the cardinality guard: with threshold=0, all documents must be visited via normal collection.
336+
*/
337+
public void testTermFrequencyCardinalityGuard() throws Exception {
338+
try (Directory directory = newDirectory()) {
339+
try (
340+
RandomIndexWriter indexWriter = new RandomIndexWriter(
341+
random(),
342+
directory,
343+
newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE)
344+
)
345+
) {
346+
List<Document> documents = new ArrayList<>();
347+
Document document = new Document();
348+
ADD_SORTED_SET_FIELD_INDEXED.apply(document, "string", "a");
349+
ADD_SORTED_SET_FIELD_INDEXED.apply(document, "string", "b");
350+
documents.add(document);
351+
352+
document = new Document();
353+
ADD_SORTED_SET_FIELD_INDEXED.apply(document, "string", "");
354+
ADD_SORTED_SET_FIELD_INDEXED.apply(document, "string", "c");
355+
ADD_SORTED_SET_FIELD_INDEXED.apply(document, "string", "a");
356+
documents.add(document);
357+
358+
document = new Document();
359+
ADD_SORTED_SET_FIELD_INDEXED.apply(document, "string", "b");
360+
ADD_SORTED_SET_FIELD_INDEXED.apply(document, "string", "d");
361+
documents.add(document);
362+
363+
document = new Document();
364+
ADD_SORTED_SET_FIELD_INDEXED.apply(document, "string", "");
365+
documents.add(document);
366+
367+
indexWriter.addDocuments(documents);
368+
369+
try (IndexReader indexReader = maybeWrapReaderEs(indexWriter.getReader())) {
370+
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
371+
372+
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name").userValueTypeHint(ValueType.STRING)
373+
.executionHint(TermsAggregatorFactory.ExecutionMode.GLOBAL_ORDINALS.toString())
374+
.field("string")
375+
.order(BucketOrder.key(true));
376+
MappedFieldType fieldType = new KeywordFieldMapper.KeywordFieldType("string");
377+
378+
TermsAggregatorFactory.COLLECT_SEGMENT_ORDS = false;
379+
TermsAggregatorFactory.REMAP_GLOBAL_ORDS = false;
380+
381+
// Set threshold to 0 so the cardinality guard bails out of tryCollectFromTermFrequencies
382+
CountingAggregator aggregator = new CountingAggregator(
383+
new AtomicInteger(),
384+
createAggregatorWithCustomizableSearchContext(
385+
new MatchAllDocsQuery(),
386+
aggregationBuilder,
387+
indexSearcher,
388+
createIndexSettings(),
389+
new MultiBucketConsumerService.MultiBucketConsumer(
390+
DEFAULT_MAX_BUCKETS,
391+
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
392+
),
393+
searchContext -> when(searchContext.termsAggregationMaxPrecomputeCardinality()).thenReturn(0L),
394+
fieldType
395+
)
396+
);
397+
398+
aggregator.preCollection();
399+
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
400+
aggregator.postCollection();
401+
Terms result = reduce(aggregator);
402+
assertEquals(5, result.getBuckets().size());
403+
assertEquals("", result.getBuckets().get(0).getKeyAsString());
404+
assertEquals(2L, result.getBuckets().get(0).getDocCount());
405+
assertEquals("a", result.getBuckets().get(1).getKeyAsString());
406+
assertEquals(2L, result.getBuckets().get(1).getDocCount());
407+
assertEquals("b", result.getBuckets().get(2).getKeyAsString());
408+
assertEquals(2L, result.getBuckets().get(2).getDocCount());
409+
assertEquals("c", result.getBuckets().get(3).getKeyAsString());
410+
assertEquals(1L, result.getBuckets().get(3).getDocCount());
411+
assertEquals("d", result.getBuckets().get(4).getKeyAsString());
412+
assertEquals(1L, result.getBuckets().get(4).getDocCount());
413+
414+
// With threshold=0, tryCollectFromTermFrequencies should bail out,
415+
// so all 4 documents must be visited via normal collection
416+
assertEquals(4, aggregator.getCollectCount().get());
417+
}
418+
}
419+
}
420+
}
421+
332422
/**
333423
* This test case utilizes the MapStringTermsAggregator.
334424
*/

test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ public boolean shouldCache(Query query) {
550550
fieldNameToType.putAll(getFieldAliases(fieldTypes));
551551

552552
when(searchContext.maxAggRewriteFilters()).thenReturn(10_000);
553+
when(searchContext.termsAggregationMaxPrecomputeCardinality()).thenReturn(30_000L);
553554
when(searchContext.cardinalityAggregationContext()).thenReturn(
554555
new org.opensearch.search.aggregations.metrics.CardinalityAggregationContext(false, Runtime.getRuntime().maxMemory() / 100)
555556
);

0 commit comments

Comments
 (0)