Skip to content

Commit fe9167c

Browse files
authored
Introduce ProfileCollectorManager (#96828)
This change adds a new Collector manager that can warp other CollectorManagers and subsequently wraps all collectors generated by the underlying collector manager into an InternalProfileCollector. This will be used to enable inter-segment search concurrency.
1 parent 795e07c commit fe9167c

File tree

7 files changed

+201
-14
lines changed

7 files changed

+201
-14
lines changed

server/src/main/java/org/elasticsearch/search/profile/Profilers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public SearchProfileQueryPhaseResult buildQueryPhaseResults() {
6565
QueryProfileShardResult result = new QueryProfileShardResult(
6666
queryProfiler.getTree(),
6767
queryProfiler.getRewriteTime(),
68-
queryProfiler.getCollector()
68+
queryProfiler.getCollectorResult()
6969
);
7070
AggregationProfileShardResult aggResults = new AggregationProfileShardResult(aggProfiler.getTree());
7171
return new SearchProfileQueryPhaseResult(Collections.singletonList(result), aggResults);

server/src/main/java/org/elasticsearch/search/profile/dfs/DfsProfiler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void stopTimer(DfsTimingType dfsTimingType) {
5454

5555
public QueryProfiler addQueryProfiler(InternalProfileCollectorManager collectorManager) {
5656
QueryProfiler queryProfiler = new QueryProfiler();
57-
queryProfiler.setCollectorManager(collectorManager);
57+
queryProfiler.setCollectorManager(collectorManager::getCollectorTree);
5858
knnQueryProfilers.add(queryProfiler);
5959
collectorManagerSet = true;
6060
return queryProfiler;
@@ -74,7 +74,7 @@ public SearchProfileDfsPhaseResult buildDfsPhaseResults() {
7474
queryProfileShardResult = new ArrayList<>(knnQueryProfilers.size());
7575
for (QueryProfiler queryProfiler : knnQueryProfilers) {
7676
queryProfileShardResult.add(
77-
new QueryProfileShardResult(queryProfiler.getTree(), queryProfiler.getRewriteTime(), queryProfiler.getCollector())
77+
new QueryProfileShardResult(queryProfiler.getTree(), queryProfiler.getRewriteTime(), queryProfiler.getCollectorResult())
7878
);
7979
}
8080
} else {

server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,27 @@
2121
* - collect()
2222
* - doSetNextReader()
2323
* - needsScores()
24-
*
24+
* <p>
2525
* InternalProfiler facilitates the linking of the Collector graph
2626
*/
2727
public class InternalProfileCollector extends ProfilerCollector {
2828

2929
private final InternalProfileCollector[] children;
30+
private final Collector wrappedCollector;
3031

3132
public InternalProfileCollector(Collector collector, String reason, InternalProfileCollector... children) {
3233
super(collector, reason, Arrays.asList(children));
34+
this.wrappedCollector = collector;
3335
this.children = children;
3436
}
3537

38+
public Collector getWrappedCollector() {
39+
return wrappedCollector;
40+
}
41+
3642
/**
3743
* Creates a human-friendly representation of the Collector name.
38-
*
44+
* <p>
3945
* InternalBucket Collectors use the aggregation name in their toString() method,
4046
* which makes the profiled output a bit nicer.
4147
*
@@ -46,7 +52,7 @@ public InternalProfileCollector(Collector collector, String reason, InternalProf
4652
protected String deriveCollectorName(Collector c) {
4753
String s = c.getClass().getSimpleName();
4854

49-
// MutiCollector which wraps multiple BucketCollectors is generated
55+
// MultiCollector which wraps multiple BucketCollectors is generated
5056
// via an anonymous class, so this corrects the lack of a name by
5157
// asking the enclosingClass
5258
if (s.equals("")) {
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search.profile.query;
10+
11+
import org.apache.lucene.search.Collector;
12+
import org.apache.lucene.search.CollectorManager;
13+
14+
import java.io.IOException;
15+
import java.util.Collection;
16+
import java.util.List;
17+
import java.util.stream.Collectors;
18+
19+
/**
20+
* A {@link CollectorManager} that takes another CollectorManager as input and wraps all Collectors generated by it
21+
* in an {@link InternalProfileCollector}. It delegates all the profiling to the generated collectors via {@link #getCollectorTree()}
22+
* and joins them up when its {@link #reduce} method is called. The profile result can
23+
*/
24+
public final class ProfileCollectorManager implements CollectorManager<InternalProfileCollector, Void> {
25+
26+
private final CollectorManager<Collector, ?> collectorManager;
27+
private final String reason;
28+
private CollectorResult collectorTree;
29+
30+
@SuppressWarnings("unchecked")
31+
public ProfileCollectorManager(CollectorManager<? extends Collector, ?> collectorManager, String reason) {
32+
this.collectorManager = (CollectorManager<Collector, ?>) collectorManager;
33+
this.reason = reason;
34+
}
35+
36+
@Override
37+
public InternalProfileCollector newCollector() throws IOException {
38+
return new InternalProfileCollector(collectorManager.newCollector(), reason);
39+
}
40+
41+
public Void reduce(Collection<InternalProfileCollector> profileCollectors) throws IOException {
42+
List<Collector> unwrapped = profileCollectors.stream()
43+
.map(InternalProfileCollector::getWrappedCollector)
44+
.collect(Collectors.toList());
45+
collectorManager.reduce(unwrapped);
46+
47+
List<CollectorResult> resultsPerProfiler = profileCollectors.stream()
48+
.map(ipc -> ipc.getCollectorTree())
49+
.collect(Collectors.toList());
50+
this.collectorTree = new CollectorResult(this.getClass().getSimpleName(), "segment_search", 0, resultsPerProfiler);
51+
return null;
52+
}
53+
54+
public CollectorResult getCollectorTree() {
55+
if (this.collectorTree == null) {
56+
throw new IllegalStateException("A collectorTree hasn't been set yet, call reduce() before attempting to retrieve it");
57+
}
58+
return this.collectorTree;
59+
}
60+
}

server/src/main/java/org/elasticsearch/search/profile/query/QueryProfiler.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.search.profile.AbstractProfiler;
1313

1414
import java.util.Objects;
15+
import java.util.function.Supplier;
1516

1617
/**
1718
* This class acts as a thread-local storage for profiling a query. It also
@@ -29,18 +30,18 @@ public final class QueryProfiler extends AbstractProfiler<QueryProfileBreakdown,
2930
/**
3031
* The root Collector used in the search
3132
*/
32-
private InternalProfileCollectorManager collectorManager;
33+
private Supplier<CollectorResult> collectorResultSupplier;
3334

3435
public QueryProfiler() {
3536
super(new InternalQueryProfileTree());
3637
}
3738

3839
/** Set the collector manager that is associated with this profiler. */
39-
public void setCollectorManager(InternalProfileCollectorManager collectorManager) {
40-
if (this.collectorManager != null) {
41-
throw new IllegalStateException("The collector manager can only be set once.");
40+
public void setCollectorManager(Supplier<CollectorResult> collectorResultSupplier) {
41+
if (this.collectorResultSupplier != null) {
42+
throw new IllegalStateException("The collector result supplier can only be set once.");
4243
}
43-
this.collectorManager = Objects.requireNonNull(collectorManager);
44+
this.collectorResultSupplier = Objects.requireNonNull(collectorResultSupplier);
4445
}
4546

4647
/**
@@ -71,8 +72,8 @@ public long getRewriteTime() {
7172
/**
7273
* Return the current root Collector for this search
7374
*/
74-
public CollectorResult getCollector() {
75-
return collectorManager.getCollectorTree();
75+
public CollectorResult getCollectorResult() {
76+
return this.collectorResultSupplier.get();
7677
}
7778

7879
}

server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,9 @@ private static void searchWithCollectorManager(
312312
boolean timeoutSet
313313
) throws IOException {
314314
if (searchContext.getProfilers() != null) {
315-
searchContext.getProfilers().getCurrentQueryProfiler().setCollectorManager((InternalProfileCollectorManager) collectorManager);
315+
searchContext.getProfilers()
316+
.getCurrentQueryProfiler()
317+
.setCollectorManager(((InternalProfileCollectorManager) collectorManager)::getCollectorTree);
316318
}
317319
QuerySearchResult queryResult = searchContext.queryResult();
318320
try {
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search.profile.query;
10+
11+
import org.apache.lucene.document.Document;
12+
import org.apache.lucene.document.Field;
13+
import org.apache.lucene.document.StringField;
14+
import org.apache.lucene.index.IndexReader;
15+
import org.apache.lucene.sandbox.search.ProfilerCollectorResult;
16+
import org.apache.lucene.search.Collector;
17+
import org.apache.lucene.search.CollectorManager;
18+
import org.apache.lucene.search.IndexSearcher;
19+
import org.apache.lucene.search.MatchAllDocsQuery;
20+
import org.apache.lucene.search.TopDocs;
21+
import org.apache.lucene.search.TopScoreDocCollector;
22+
import org.apache.lucene.search.similarities.BM25Similarity;
23+
import org.apache.lucene.store.Directory;
24+
import org.apache.lucene.tests.index.RandomIndexWriter;
25+
import org.apache.lucene.tests.search.DummyTotalHitCountCollector;
26+
import org.apache.lucene.util.SetOnce;
27+
import org.elasticsearch.test.ESTestCase;
28+
29+
import java.io.IOException;
30+
import java.util.Collection;
31+
import java.util.Collections;
32+
import java.util.List;
33+
34+
public class ProfileCollectorManagerTests extends ESTestCase {
35+
36+
private static class TestCollector extends DummyTotalHitCountCollector {
37+
38+
private final int id;
39+
40+
TestCollector(int id) {
41+
this.id = id;
42+
}
43+
}
44+
45+
/**
46+
* This test checks that each new collector is a different instance on each call and that
47+
* the call to reduce() is forwarded to the wrapped collector manager.
48+
*/
49+
public void testBasic() throws IOException {
50+
final SetOnce<Boolean> reduceCalled = new SetOnce<>();
51+
ProfileCollectorManager pcm = new ProfileCollectorManager(new CollectorManager<>() {
52+
53+
private static int counter = 0;
54+
55+
@Override
56+
public Collector newCollector() {
57+
return new TestCollector(counter++);
58+
}
59+
60+
@Override
61+
public Void reduce(Collection<Collector> collectors) {
62+
reduceCalled.set(true);
63+
return null;
64+
}
65+
}, CollectorResult.REASON_SEARCH_TOP_HITS);
66+
for (int i = 0; i < randomIntBetween(5, 10); i++) {
67+
InternalProfileCollector internalProfileCollector = pcm.newCollector();
68+
assertEquals(i, ((TestCollector) internalProfileCollector.getWrappedCollector()).id);
69+
}
70+
pcm.reduce(Collections.emptyList());
71+
assertTrue(reduceCalled.get());
72+
}
73+
74+
/**
75+
* This test checks functionality with potentially more than one slice on a real searcher,
76+
* wrapping a {@link TopScoreDocCollector} into {@link ProfileCollectorManager} and checking the
77+
* result from calling the collector tree contains profile results for each slice.
78+
*/
79+
public void testManagerWithSearcher() throws IOException {
80+
Directory directory = newDirectory();
81+
try (RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig())) {
82+
int numDocs = randomIntBetween(900, 1000);
83+
for (int i = 0; i < numDocs; i++) {
84+
Document doc = new Document();
85+
doc.add(new StringField("field1", "value", Field.Store.NO));
86+
writer.addDocument(doc);
87+
}
88+
writer.flush();
89+
IndexReader reader = writer.getReader();
90+
IndexSearcher searcher = newSearcher(reader);
91+
int numSlices = searcher.getSlices() == null ? 1 : searcher.getSlices().length;
92+
searcher.setSimilarity(new BM25Similarity());
93+
94+
CollectorManager<TopScoreDocCollector, TopDocs> topDocsManager = TopScoreDocCollector.createSharedManager(10, null, 1000);
95+
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), topDocsManager);
96+
assertEquals(numDocs, topDocs.totalHits.value);
97+
98+
String profileReason = "profiler_reason";
99+
ProfileCollectorManager profileCollectorManager = new ProfileCollectorManager(topDocsManager, profileReason);
100+
101+
searcher.search(new MatchAllDocsQuery(), profileCollectorManager);
102+
103+
CollectorResult parent = profileCollectorManager.getCollectorTree();
104+
assertEquals("ProfileCollectorManager", parent.getName());
105+
assertEquals("segment_search", parent.getReason());
106+
assertEquals(0, parent.getTime());
107+
List<ProfilerCollectorResult> delegateCollectorResults = parent.getProfiledChildren();
108+
assertEquals(numSlices, delegateCollectorResults.size());
109+
for (ProfilerCollectorResult pcr : delegateCollectorResults) {
110+
assertEquals("SimpleTopScoreDocCollector", pcr.getName());
111+
assertEquals(profileReason, pcr.getReason());
112+
assertTrue(pcr.getTime() > 0);
113+
}
114+
reader.close();
115+
}
116+
directory.close();
117+
}
118+
}

0 commit comments

Comments
 (0)