Skip to content

Commit f1c94af

Browse files
authored
Add a flag to control the concurrent execution of aggregations (#96023)
1 parent f01f07f commit f1c94af

File tree

10 files changed

+201
-26
lines changed

10 files changed

+201
-26
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,18 @@ public boolean isInSortOrderExecutionRequired() {
219219
return false;
220220
}
221221

222+
/**
223+
* Return false if this aggregation or any of the child aggregations does not support concurrent search
224+
*/
225+
public boolean supportsConcurrentExecution() {
226+
for (AggregationBuilder builder : factoriesBuilder.getAggregatorFactories()) {
227+
if (builder.supportsConcurrentExecution() == false) {
228+
return false;
229+
}
230+
}
231+
return isInSortOrderExecutionRequired() == false;
232+
}
233+
222234
/**
223235
* Called by aggregations whose parents must be sequentially ordered.
224236
* @param type the type of the aggregation being validated

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ public boolean supportsSampling() {
9696
return true;
9797
}
9898

99+
@Override
100+
public boolean supportsConcurrentExecution() {
101+
return false;
102+
}
103+
99104
public CompositeAggregationBuilder(StreamInput in) throws IOException {
100105
super(in);
101106
int num = in.readVInt();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregationBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public BucketCardinality bucketCardinality() {
8080
return BucketCardinality.ONE;
8181
}
8282

83+
@Override
84+
public boolean supportsConcurrentExecution() {
85+
return false;
86+
}
87+
8388
@Override
8489
protected AggregatorFactory doBuild(AggregationContext context, AggregatorFactory parent, Builder subFactoriesBuilder)
8590
throws IOException {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ public boolean supportsSampling() {
133133
return true;
134134
}
135135

136+
@Override
137+
public boolean supportsConcurrentExecution() {
138+
return false;
139+
}
140+
136141
@Override
137142
protected ValuesSourceType defaultValueSourceType() {
138143
return CoreValuesSourceType.KEYWORD;

server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregationBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ public boolean supportsSampling() {
112112
return true;
113113
}
114114

115+
@Override
116+
public boolean supportsConcurrentExecution() {
117+
return false;
118+
}
119+
115120
@Override
116121
protected boolean serializeTargetValueType(TransportVersion version) {
117122
return true;

server/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregatorTests.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,32 @@ public void testIndexedMultiValuedString() throws IOException {
581581
);
582582
}
583583

584+
public void testIndexedAllDifferentValues() throws IOException {
585+
// Indexing enables testing of ordinal values
586+
final CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("name").field("str_values");
587+
final MappedFieldType mappedFieldTypes = new KeywordFieldMapper.KeywordFieldType("str_values");
588+
int docs = randomIntBetween(50, 100);
589+
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> {
590+
591+
for (int i = 0; i < docs; i++) {
592+
iw.addDocument(
593+
List.of(
594+
new StringField("str_values", "" + i, Field.Store.NO),
595+
new SortedSetDocValuesField("str_values", new BytesRef("" + i))
596+
)
597+
);
598+
if (rarely()) {
599+
iw.commit();
600+
}
601+
}
602+
};
603+
604+
testAggregation(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, card -> {
605+
assertEquals(docs, card.getValue());
606+
assertTrue(AggregationInspectionHelper.hasValue(card));
607+
}, mappedFieldTypes);
608+
}
609+
584610
public void testUnmappedMissingString() throws IOException {
585611
CardinalityAggregationBuilder aggregationBuilder = new CardinalityAggregationBuilder("name").field("number").missing("🍌🍌🍌");
586612

test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

Lines changed: 86 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.lucene.index.NoMergePolicy;
2525
import org.apache.lucene.sandbox.document.HalfFloatPoint;
2626
import org.apache.lucene.search.Collector;
27+
import org.apache.lucene.search.CollectorManager;
2728
import org.apache.lucene.search.IndexSearcher;
2829
import org.apache.lucene.search.MatchAllDocsQuery;
2930
import org.apache.lucene.search.Query;
@@ -58,6 +59,7 @@
5859
import org.elasticsearch.common.util.BigArrays;
5960
import org.elasticsearch.common.util.MockBigArrays;
6061
import org.elasticsearch.common.util.MockPageCacheRecycler;
62+
import org.elasticsearch.common.util.concurrent.EsExecutors;
6163
import org.elasticsearch.core.CheckedConsumer;
6264
import org.elasticsearch.core.Releasable;
6365
import org.elasticsearch.core.Releasables;
@@ -136,6 +138,8 @@
136138
import org.elasticsearch.search.internal.SubSearchContext;
137139
import org.elasticsearch.test.ESTestCase;
138140
import org.elasticsearch.test.InternalAggregationTestCase;
141+
import org.elasticsearch.threadpool.TestThreadPool;
142+
import org.elasticsearch.threadpool.ThreadPool;
139143
import org.elasticsearch.xcontent.ContextParser;
140144
import org.elasticsearch.xcontent.XContentBuilder;
141145
import org.junit.After;
@@ -145,13 +149,16 @@
145149
import java.net.InetAddress;
146150
import java.util.ArrayList;
147151
import java.util.Arrays;
152+
import java.util.Collection;
148153
import java.util.Collections;
149154
import java.util.HashMap;
150155
import java.util.HashSet;
151156
import java.util.List;
152157
import java.util.Map;
153158
import java.util.Optional;
154159
import java.util.Set;
160+
import java.util.concurrent.ExecutionException;
161+
import java.util.concurrent.ThreadPoolExecutor;
155162
import java.util.function.BiFunction;
156163
import java.util.function.Consumer;
157164
import java.util.stream.Stream;
@@ -161,7 +168,6 @@
161168
import static java.util.stream.Collectors.toList;
162169
import static org.elasticsearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
163170
import static org.hamcrest.Matchers.equalTo;
164-
import static org.hamcrest.Matchers.in;
165171
import static org.hamcrest.Matchers.instanceOf;
166172
import static org.hamcrest.Matchers.not;
167173
import static org.hamcrest.Matchers.sameInstance;
@@ -177,7 +183,7 @@
177183
*/
178184
public abstract class AggregatorTestCase extends ESTestCase {
179185
private NamedWriteableRegistry namedWriteableRegistry;
180-
private List<AggregationContext> releasables = new ArrayList<>();
186+
private final List<AggregationContext> releasables = new ArrayList<>();
181187
protected ValuesSourceRegistry valuesSourceRegistry;
182188
private AnalysisModule analysisModule;
183189

@@ -192,9 +198,21 @@ public abstract class AggregatorTestCase extends ESTestCase {
192198
CompletionFieldMapper.CONTENT_TYPE, // TODO support completion
193199
FieldAliasMapper.CONTENT_TYPE // TODO support alias
194200
);
201+
ThreadPool threadPool;
202+
ThreadPoolExecutor threadPoolExecutor;
195203

196204
@Before
197205
public final void initPlugins() {
206+
int numThreads = randomIntBetween(2, 4);
207+
threadPool = new TestThreadPool(AggregatorTestCase.class.getName());
208+
threadPoolExecutor = EsExecutors.newFixed(
209+
"test",
210+
numThreads,
211+
10,
212+
EsExecutors.daemonThreadFactory("test"),
213+
threadPool.getThreadContext(),
214+
randomBoolean()
215+
);
198216
List<SearchPlugin> plugins = new ArrayList<>(getSearchPlugins());
199217
plugins.add(new AggCardinalityUpperBoundPlugin());
200218
SearchModule searchModule = new SearchModule(Settings.EMPTY, plugins);
@@ -475,7 +493,14 @@ private void runWithCrankyCircuitBreaker(IndexSettings indexSettings, IndexSearc
475493
} catch (CircuitBreakingException e) {
476494
// Circuit breaks from the cranky breaker are expected - it randomly fails, after all
477495
assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
478-
} catch (IOException e) {
496+
} catch (RuntimeException e) {
497+
if (e.getCause() instanceof ExecutionException executionException) {
498+
if (executionException.getCause() instanceof CircuitBreakingException circuitBreakingException) {
499+
// Circuit breaks from the cranky breaker are expected - it randomly fails, after all
500+
assertThat(circuitBreakingException.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
501+
return;
502+
}
503+
}
479504
throw e;
480505
}
481506
}
@@ -497,7 +522,7 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
497522

498523
final IndexReaderContext ctx = searcher.getTopReaderContext();
499524
final PipelineTree pipelines = builder.buildPipelineTree();
500-
List<InternalAggregation> aggs = new ArrayList<>();
525+
List<InternalAggregation> internalAggs = new ArrayList<>();
501526
Query rewritten = searcher.rewrite(query);
502527

503528
if (splitLeavesIntoSeparateAggregators
@@ -533,7 +558,7 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
533558
}
534559
a.postCollection();
535560
assertEquals(shouldBeCached, context.isCacheable());
536-
aggs.add(a.buildTopLevel());
561+
internalAggs.add(a.buildTopLevel());
537562
} finally {
538563
Releasables.close(context);
539564
}
@@ -550,39 +575,61 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
550575
fieldTypes
551576
);
552577
try {
553-
C root = createAggregator(builder, context);
554-
root.preCollection();
578+
List<C> aggregators = new ArrayList<>();
555579
if (context.isInSortOrderExecutionRequired()) {
580+
C root = createAggregator(builder, context);
581+
root.preCollection();
582+
aggregators.add(root);
556583
new TimeSeriesIndexSearcher(searcher, List.of()).search(rewritten, MultiBucketCollector.wrap(true, List.of(root)));
557584
} else {
558-
searcher.search(rewritten, MultiBucketCollector.wrap(true, List.of(root)).asCollector());
585+
CollectorManager<Collector, Void> collectorManager = new CollectorManager<>() {
586+
@Override
587+
public Collector newCollector() throws IOException {
588+
C collector = createAggregator(builder, context);
589+
collector.preCollection();
590+
aggregators.add(collector);
591+
return MultiBucketCollector.wrap(true, List.of(collector)).asCollector();
592+
}
593+
594+
@Override
595+
public Void reduce(Collection<Collector> collectors) {
596+
return null;
597+
}
598+
};
599+
if (aggTestConfig.builder().supportsConcurrentExecution()) {
600+
searcher.search(rewritten, collectorManager);
601+
} else {
602+
searcher.search(rewritten, collectorManager.newCollector());
603+
}
604+
}
605+
for (C agg : aggregators) {
606+
agg.postCollection();
607+
internalAggs.add(agg.buildTopLevel());
559608
}
560-
root.postCollection();
561-
aggs.add(root.buildTopLevel());
562609
} finally {
563610
Releasables.close(context);
564611
}
565612
}
566-
assertRoundTrip(aggs);
613+
assertRoundTrip(internalAggs);
567614

568615
BigArrays bigArraysForReduction = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), breakerService);
569616
try {
570-
if (aggTestConfig.incrementalReduce() && aggs.size() > 1) {
617+
if (aggTestConfig.incrementalReduce() && internalAggs.size() > 1) {
571618
// sometimes do an incremental reduce
572-
int toReduceSize = aggs.size();
573-
Collections.shuffle(aggs, random());
619+
int toReduceSize = internalAggs.size();
620+
Collections.shuffle(internalAggs, random());
574621
int r = randomIntBetween(1, toReduceSize);
575-
List<InternalAggregation> toReduce = aggs.subList(0, r);
622+
List<InternalAggregation> toReduce = internalAggs.subList(0, r);
576623
AggregationReduceContext reduceContext = new AggregationReduceContext.ForPartial(
577624
bigArraysForReduction,
578625
getMockScriptService(),
579626
() -> false,
580627
builder
581628
);
582-
A reduced = (A) aggs.get(0).reduce(toReduce, reduceContext);
583-
aggs = new ArrayList<>(aggs.subList(r, toReduceSize));
584-
aggs.add(reduced);
585-
assertRoundTrip(aggs);
629+
A reduced = (A) internalAggs.get(0).reduce(toReduce, reduceContext);
630+
internalAggs = new ArrayList<>(internalAggs.subList(r, toReduceSize));
631+
internalAggs.add(reduced);
632+
assertRoundTrip(internalAggs);
586633
}
587634

588635
// now do the final reduce
@@ -600,7 +647,7 @@ private <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
600647
);
601648

602649
@SuppressWarnings("unchecked")
603-
A internalAgg = (A) aggs.get(0).reduce(aggs, reduceContext);
650+
A internalAgg = (A) internalAggs.get(0).reduce(internalAggs, reduceContext);
604651
assertRoundTrip(internalAgg);
605652

606653
// materialize any parent pipelines
@@ -870,16 +917,28 @@ protected static DirectoryReader wrapInMockESDirectoryReader(DirectoryReader dir
870917
}
871918

872919
/**
873-
* Added to randomly run with more assertions on the index searcher level,
874-
* like {@link org.apache.lucene.tests.util.LuceneTestCase#newSearcher(IndexReader)}, which can't be used because it also
875-
* wraps in the IndexSearcher's IndexReader with other implementations that we can't handle. (e.g. ParallelCompositeReader)
920+
* Creates a {@link ContextIndexSearcher} that supports concurrency running each segment in a different thread. It randomly
921+
* sets the IndexSearcher to run on concurrent mode.
876922
*/
877-
protected static IndexSearcher newIndexSearcher(DirectoryReader indexReader) throws IOException {
923+
protected IndexSearcher newIndexSearcher(DirectoryReader indexReader) throws IOException {
878924
if (randomBoolean()) {
879925
// this executes basic query checks and asserts that weights are normalized only once etc.
880926
return new AssertingIndexSearcher(random(), indexReader);
881927
} else {
882-
return new IndexSearcher(indexReader);
928+
return new ContextIndexSearcher(
929+
indexReader,
930+
IndexSearcher.getDefaultSimilarity(),
931+
IndexSearcher.getDefaultQueryCache(),
932+
IndexSearcher.getDefaultQueryCachingPolicy(),
933+
randomBoolean(),
934+
this.threadPoolExecutor
935+
) {
936+
@Override
937+
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
938+
// get a thread per segment
939+
return slices(leaves, 1, 1);
940+
}
941+
};
883942
}
884943
}
885944

@@ -1179,6 +1238,8 @@ public IndexAnalyzers getIndexAnalyzers() {
11791238
public void cleanupReleasables() {
11801239
Releasables.close(releasables);
11811240
releasables.clear();
1241+
threadPoolExecutor.shutdown();
1242+
terminate(threadPool);
11821243
}
11831244

11841245
/**

test/framework/src/main/java/org/elasticsearch/search/aggregations/BaseAggregationTestCase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1313
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
1414
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
1516
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
1617
import org.elasticsearch.test.AbstractBuilderTestCase;
1718
import org.elasticsearch.xcontent.ToXContent;
@@ -27,6 +28,7 @@
2728
import java.util.Set;
2829

2930
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
31+
import static org.hamcrest.Matchers.equalTo;
3032
import static org.hamcrest.Matchers.hasSize;
3133

3234
public abstract class BaseAggregationTestCase<AB extends AbstractAggregationBuilder<AB>> extends AbstractBuilderTestCase {
@@ -56,6 +58,15 @@ public void testFromXContent() throws IOException {
5658
assertEquals(testAgg.hashCode(), newAgg.hashCode());
5759
}
5860

61+
public void testSupportsConcurrentExecution() {
62+
AB builder = createTestAggregatorBuilder();
63+
boolean supportsConcurrency = builder.supportsConcurrentExecution();
64+
AggregationBuilder bucketBuilder = new HistogramAggregationBuilder("test");
65+
assertThat(bucketBuilder.supportsConcurrentExecution(), equalTo(true));
66+
bucketBuilder.subAggregation(builder);
67+
assertThat(bucketBuilder.supportsConcurrentExecution(), equalTo(supportsConcurrency));
68+
}
69+
5970
/**
6071
* Create at least 2 aggregations and test equality and hash
6172
*/

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregationBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ public boolean supportsSampling() {
152152
return true;
153153
}
154154

155+
@Override
156+
public boolean supportsConcurrentExecution() {
157+
return false;
158+
}
159+
155160
/**
156161
* Sets the field to use for this aggregation.
157162
*/

0 commit comments

Comments
 (0)