Skip to content

Commit ffe811d

Browse files
bcullyncordon
authored andcommitted
Pass split shard count summary through acquireSearcher (elastic#138259)
This will allow us to install search filters on shards being split according to whether the coordinating node is including new search shards in its requests or not. Move wrapDirectoryReader to wrapExternalDirectoryReader. After reviewing the users of SearcherScope.INTERNAL none of them appear to want filtering, so it is simpler and safer not to provide an interface for them to do so.
1 parent d23df48 commit ffe811d

File tree

12 files changed

+106
-52
lines changed

12 files changed

+106
-52
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
825825
shardIt.getClusterAlias(),
826826
shardIt.getSearchContextId(),
827827
shardIt.getSearchContextKeepAlive(),
828-
shardIt.getReshardSplitShardCountSummary()
828+
shardIt.getSplitShardCountSummary()
829829
);
830830
// if we already received a search result we can inform the shard that it
831831
// can return a null response if the request rewrites to match none rather

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator sha
449449
shardIt.getSearchContextId(),
450450
shardIt.getSearchContextKeepAlive(),
451451
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex),
452-
shardIt.getReshardSplitShardCountSummary()
452+
shardIt.getSplitShardCountSummary()
453453
);
454454
}
455455

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
498498
shardIndex,
499499
routing.getShardId(),
500500
shardRoutings.getSearchContextId(),
501-
shardRoutings.getReshardSplitShardCountSummary()
501+
shardRoutings.getSplitShardCountSummary()
502502
)
503503
);
504504
var filterForAlias = aliasFilter.getOrDefault(indexUUID, AliasFilter.EMPTY);

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator
4242
/**
4343
* Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}.
4444
*/
45-
private final SplitShardCountSummary reshardSplitShardCountSummary;
45+
private final SplitShardCountSummary splitShardCountSummary;
4646

4747
/**
4848
* Creates a {@link SearchShardIterator} instance that iterates over a set of replicas of a shard with provided <code>shardId</code>.
@@ -57,7 +57,7 @@ public SearchShardIterator(
5757
ShardId shardId,
5858
List<ShardRouting> shards,
5959
OriginalIndices originalIndices,
60-
SplitShardCountSummary reshardSplitShardCountSummary
60+
SplitShardCountSummary splitShardCountSummary
6161
) {
6262
this(
6363
clusterAlias,
@@ -68,23 +68,23 @@ public SearchShardIterator(
6868
null,
6969
false,
7070
false,
71-
reshardSplitShardCountSummary
71+
splitShardCountSummary
7272
);
7373
}
7474

7575
/**
7676
* Creates a {@link SearchShardIterator} instance that iterates over a set of nodes that are known to contain replicas of a shard
7777
* with provided <code>shardId</code>.
7878
*
79-
* @param clusterAlias the alias of the cluster where the shard is located
80-
* @param shardId shard id of the group
81-
* @param targetNodeIds the list of nodes hosting shard copies
82-
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
83-
* @param searchContextId the point-in-time specified for this group if exists
84-
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
85-
* @param prefiltered if true, then this group already executed the can_match phase
86-
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
87-
* @param reshardSplitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary}
79+
* @param clusterAlias the alias of the cluster where the shard is located
80+
* @param shardId shard id of the group
81+
* @param targetNodeIds the list of nodes hosting shard copies
82+
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
83+
* @param searchContextId the point-in-time specified for this group if exists
84+
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
85+
* @param prefiltered if true, then this group already executed the can_match phase
86+
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
87+
* @param splitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#splitShardCountSummary}
8888
*/
8989
public SearchShardIterator(
9090
@Nullable String clusterAlias,
@@ -95,7 +95,7 @@ public SearchShardIterator(
9595
TimeValue searchContextKeepAlive,
9696
boolean prefiltered,
9797
boolean skip,
98-
SplitShardCountSummary reshardSplitShardCountSummary
98+
SplitShardCountSummary splitShardCountSummary
9999
) {
100100
this.shardId = shardId;
101101
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
@@ -107,7 +107,7 @@ public SearchShardIterator(
107107
this.prefiltered = prefiltered;
108108
this.skip = skip;
109109
assert skip == false || prefiltered : "only prefiltered shards are skip-able";
110-
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
110+
this.splitShardCountSummary = splitShardCountSummary;
111111
}
112112

113113
/**
@@ -195,8 +195,8 @@ ShardId shardId() {
195195
return shardId;
196196
}
197197

198-
public SplitShardCountSummary getReshardSplitShardCountSummary() {
199-
return reshardSplitShardCountSummary;
198+
public SplitShardCountSummary getSplitShardCountSummary() {
199+
return splitShardCountSummary;
200200
}
201201

202202
@Override

server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -211,12 +211,7 @@ private static List<SearchShardsGroup> toGroups(List<SearchShardIterator> shardI
211211
List<SearchShardsGroup> groups = new ArrayList<>(shardIts.size());
212212
for (SearchShardIterator shardIt : shardIts) {
213213
groups.add(
214-
new SearchShardsGroup(
215-
shardIt.shardId(),
216-
shardIt.getTargetNodeIds(),
217-
shardIt.skip(),
218-
shardIt.getReshardSplitShardCountSummary()
219-
)
214+
new SearchShardsGroup(shardIt.shardId(), shardIt.getTargetNodeIds(), shardIt.skip(), shardIt.getSplitShardCountSummary())
220215
);
221216
}
222217
return groups;

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.action.support.SubscribableListener;
4646
import org.elasticsearch.action.support.UnsafePlainActionFuture;
4747
import org.elasticsearch.cluster.node.DiscoveryNode;
48+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
4849
import org.elasticsearch.cluster.service.ClusterApplierService;
4950
import org.elasticsearch.common.bytes.BytesReference;
5051
import org.elasticsearch.common.logging.Loggers;
@@ -998,15 +999,23 @@ public final SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searche
998999
// Called before a {@link Searcher} is created, to allow subclasses to perform any stats or logging operations.
9991000
protected void onSearcherCreation(String source, SearcherScope scope) {}
10001001

1001-
// Allows subclasses to wrap the DirectoryReader before it is used to create Searchers
1002-
protected DirectoryReader wrapDirectoryReader(DirectoryReader reader) throws IOException {
1002+
// Allows subclasses to wrap the DirectoryReader before it is used to create external Searchers
1003+
protected DirectoryReader wrapExternalDirectoryReader(DirectoryReader reader, SplitShardCountSummary ignored) throws IOException {
10031004
return reader;
10041005
}
10051006

10061007
/**
10071008
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
10081009
*/
10091010
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
1011+
return acquireSearcherSupplier(wrapper, scope, SplitShardCountSummary.UNSET);
1012+
}
1013+
1014+
public SearcherSupplier acquireSearcherSupplier(
1015+
Function<Searcher, Searcher> wrapper,
1016+
SearcherScope scope,
1017+
SplitShardCountSummary splitShardCountSummary
1018+
) throws EngineException {
10101019
/* Acquire order here is store -> manager since we need
10111020
* to make sure that the store is not closed before
10121021
* the searcher is acquired. */
@@ -1017,15 +1026,20 @@ public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wra
10171026
try {
10181027
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
10191028
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
1020-
DirectoryReader wrappedDirectoryReader = wrapDirectoryReader(acquire);
1029+
final DirectoryReader maybeWrappedDirectoryReader;
1030+
if (scope == SearcherScope.EXTERNAL) {
1031+
maybeWrappedDirectoryReader = wrapExternalDirectoryReader(acquire, splitShardCountSummary);
1032+
} else {
1033+
maybeWrappedDirectoryReader = acquire;
1034+
}
10211035
SearcherSupplier reader = new SearcherSupplier(wrapper) {
10221036
@Override
10231037
public Searcher acquireSearcherInternal(String source) {
10241038
assert assertSearcherIsWarmedUp(source, scope);
10251039
onSearcherCreation(source, scope);
10261040
return new Searcher(
10271041
source,
1028-
wrappedDirectoryReader,
1042+
maybeWrappedDirectoryReader,
10291043
engineConfig.getSimilarity(),
10301044
engineConfig.getQueryCache(),
10311045
engineConfig.getQueryCachingPolicy(),
@@ -1070,9 +1084,18 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
10701084
}
10711085

10721086
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
1087+
return acquireSearcher(source, scope, SplitShardCountSummary.UNSET, wrapper);
1088+
}
1089+
1090+
public Searcher acquireSearcher(
1091+
String source,
1092+
SearcherScope scope,
1093+
SplitShardCountSummary splitShardCountSummary,
1094+
Function<Searcher, Searcher> wrapper
1095+
) throws EngineException {
10731096
SearcherSupplier releasable = null;
10741097
try {
1075-
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope);
1098+
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope, splitShardCountSummary);
10761099
Searcher searcher = reader.acquireSearcher(source);
10771100
releasable = null;
10781101
onSearcherCreation(source, scope);

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.lucene.store.Lock;
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
25+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
2526
import org.elasticsearch.common.hash.MessageDigests;
2627
import org.elasticsearch.common.lucene.Lucene;
2728
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@@ -617,8 +618,12 @@ public ShardLongFieldRange getRawFieldRange(String field) throws IOException {
617618
}
618619

619620
@Override
620-
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
621-
final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope);
621+
public SearcherSupplier acquireSearcherSupplier(
622+
Function<Searcher, Searcher> wrapper,
623+
SearcherScope scope,
624+
SplitShardCountSummary splitShardCountSummary
625+
) throws EngineException {
626+
final SearcherSupplier delegate = super.acquireSearcherSupplier(wrapper, scope, splitShardCountSummary);
622627
return new SearcherSupplier(wrapper) {
623628
@Override
624629
protected void doClose() {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.cluster.routing.RecoverySource;
4545
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
4646
import org.elasticsearch.cluster.routing.ShardRouting;
47+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
4748
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
4849
import org.elasticsearch.cluster.service.ClusterApplierService;
4950
import org.elasticsearch.cluster.service.MasterService;
@@ -1712,14 +1713,29 @@ public Engine.SearcherSupplier acquireSearcherSupplier() {
17121713
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL);
17131714
}
17141715

1716+
/**
1717+
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
1718+
* The supplier is aware of shard splits and will filter documents that have been moved to other shards
1719+
* according to the provided {@link SplitShardCountSummary}.
1720+
* @param splitShardCountSummary a summary of the shard routing state seen when the search request was created
1721+
* @return a searcher supplier
1722+
*/
1723+
public Engine.SearcherSupplier acquireExternalSearcherSupplier(SplitShardCountSummary splitShardCountSummary) {
1724+
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL, splitShardCountSummary);
1725+
}
1726+
17151727
/**
17161728
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
17171729
*/
17181730
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
1731+
return acquireSearcherSupplier(scope, SplitShardCountSummary.UNSET);
1732+
}
1733+
1734+
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope, SplitShardCountSummary splitShardCountSummary) {
17191735
readAllowed();
17201736
markSearcherAccessed();
17211737
final Engine engine = getEngine();
1722-
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
1738+
return engine.acquireSearcherSupplier(this::wrapSearcher, scope, splitShardCountSummary);
17231739
}
17241740

17251741
public Engine.Searcher acquireSearcher(String source) {

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,7 +1272,7 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) {
12721272
// calculated from the ids of the underlying segments of an index commit
12731273
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
12741274
final IndexShard shard = indexService.getShard(request.shardId().id());
1275-
final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier();
1275+
final Engine.SearcherSupplier searcherSupplier = shard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary());
12761276
if (contextId.sameSearcherIdsAs(searcherSupplier.getSearcherId()) == false) {
12771277
searcherSupplier.close();
12781278
throw e;
@@ -1296,7 +1296,13 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) {
12961296
}
12971297
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
12981298
final IndexShard shard = indexService.getShard(request.shardId().id());
1299-
return createAndPutReaderContext(request, indexService, shard, shard.acquireSearcherSupplier(), keepAliveInMillis);
1299+
return createAndPutReaderContext(
1300+
request,
1301+
indexService,
1302+
shard,
1303+
shard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary()),
1304+
keepAliveInMillis
1305+
);
13001306
}
13011307

13021308
final ReaderContext createAndPutReaderContext(
@@ -1448,7 +1454,7 @@ protected SearchContext createContext(
14481454
public SearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
14491455
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
14501456
final IndexShard indexShard = indexService.getShard(request.shardId().getId());
1451-
final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier();
1457+
final Engine.SearcherSupplier reader = indexShard.acquireExternalSearcherSupplier(request.getSplitShardCountSummary());
14521458
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet(), reader.getSearcherId());
14531459
try (ReaderContext readerContext = new ReaderContext(id, indexService, indexShard, reader, -1L, true)) {
14541460
// Use ResultsType.QUERY so that the created search context can execute queries correctly.

0 commit comments

Comments
 (0)