Skip to content

Commit d9ef4f7

Browse files
committed
Pass split shard count summary through acquireSearcher
This will allow us to install search filters on shards being split according to whether the coordinating node is including new search shards in its requests or not.
1 parent d74334e commit d9ef4f7

File tree

10 files changed

+78
-41
lines changed

10 files changed

+78
-41
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
825825
shardIt.getClusterAlias(),
826826
shardIt.getSearchContextId(),
827827
shardIt.getSearchContextKeepAlive(),
828-
shardIt.getReshardSplitShardCountSummary()
828+
shardIt.getSplitShardCountSummary()
829829
);
830830
// if we already received a search result we can inform the shard that it
831831
// can return a null response if the request rewrites to match none rather

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ private CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator sha
449449
shardIt.getSearchContextId(),
450450
shardIt.getSearchContextKeepAlive(),
451451
ShardSearchRequest.computeWaitForCheckpoint(request.getWaitForCheckpoints(), shardIt.shardId(), shardRequestIndex),
452-
shardIt.getReshardSplitShardCountSummary()
452+
shardIt.getSplitShardCountSummary()
453453
);
454454
}
455455

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
498498
shardIndex,
499499
routing.getShardId(),
500500
shardRoutings.getSearchContextId(),
501-
shardRoutings.getReshardSplitShardCountSummary()
501+
shardRoutings.getSplitShardCountSummary()
502502
)
503503
);
504504
var filterForAlias = aliasFilter.getOrDefault(indexUUID, AliasFilter.EMPTY);

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator
4242
/**
4343
* Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}.
4444
*/
45-
private final SplitShardCountSummary reshardSplitShardCountSummary;
45+
private final SplitShardCountSummary splitShardCountSummary;
4646

4747
/**
4848
* Creates a {@link SearchShardIterator} instance that iterates over a set of replicas of a shard with provided <code>shardId</code>.
@@ -57,7 +57,7 @@ public SearchShardIterator(
5757
ShardId shardId,
5858
List<ShardRouting> shards,
5959
OriginalIndices originalIndices,
60-
SplitShardCountSummary reshardSplitShardCountSummary
60+
SplitShardCountSummary splitShardCountSummary
6161
) {
6262
this(
6363
clusterAlias,
@@ -68,23 +68,23 @@ public SearchShardIterator(
6868
null,
6969
false,
7070
false,
71-
reshardSplitShardCountSummary
71+
splitShardCountSummary
7272
);
7373
}
7474

7575
/**
7676
* Creates a {@link SearchShardIterator} instance that iterates over a set of nodes that are known to contain replicas of a shard
7777
* with provided <code>shardId</code>.
7878
*
79-
* @param clusterAlias the alias of the cluster where the shard is located
80-
* @param shardId shard id of the group
81-
* @param targetNodeIds the list of nodes hosting shard copies
82-
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
83-
* @param searchContextId the point-in-time specified for this group if exists
84-
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
85-
* @param prefiltered if true, then this group already executed the can_match phase
86-
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
87-
* @param reshardSplitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#reshardSplitShardCountSummary}
79+
* @param clusterAlias the alias of the cluster where the shard is located
80+
* @param shardId shard id of the group
81+
* @param targetNodeIds the list of nodes hosting shard copies
82+
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
83+
* @param searchContextId the point-in-time specified for this group if exists
84+
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time
85+
* @param prefiltered if true, then this group already executed the can_match phase
86+
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search
87+
* @param splitShardCountSummary see {@link org.elasticsearch.search.internal.ShardSearchRequest#splitShardCountSummary}
8888
*/
8989
public SearchShardIterator(
9090
@Nullable String clusterAlias,
@@ -95,7 +95,7 @@ public SearchShardIterator(
9595
TimeValue searchContextKeepAlive,
9696
boolean prefiltered,
9797
boolean skip,
98-
SplitShardCountSummary reshardSplitShardCountSummary
98+
SplitShardCountSummary splitShardCountSummary
9999
) {
100100
this.shardId = shardId;
101101
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
@@ -107,7 +107,7 @@ public SearchShardIterator(
107107
this.prefiltered = prefiltered;
108108
this.skip = skip;
109109
assert skip == false || prefiltered : "only prefiltered shards are skip-able";
110-
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
110+
this.splitShardCountSummary = splitShardCountSummary;
111111
}
112112

113113
/**
@@ -195,8 +195,8 @@ ShardId shardId() {
195195
return shardId;
196196
}
197197

198-
public SplitShardCountSummary getReshardSplitShardCountSummary() {
199-
return reshardSplitShardCountSummary;
198+
public SplitShardCountSummary getSplitShardCountSummary() {
199+
return splitShardCountSummary;
200200
}
201201

202202
@Override

server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ private static List<SearchShardsGroup> toGroups(List<SearchShardIterator> shardI
215215
shardIt.shardId(),
216216
shardIt.getTargetNodeIds(),
217217
shardIt.skip(),
218-
shardIt.getReshardSplitShardCountSummary()
218+
shardIt.getSplitShardCountSummary()
219219
)
220220
);
221221
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.action.support.SubscribableListener;
4646
import org.elasticsearch.action.support.UnsafePlainActionFuture;
4747
import org.elasticsearch.cluster.node.DiscoveryNode;
48+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
4849
import org.elasticsearch.cluster.service.ClusterApplierService;
4950
import org.elasticsearch.common.bytes.BytesReference;
5051
import org.elasticsearch.common.logging.Loggers;
@@ -999,14 +1000,22 @@ public final SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searche
9991000
protected void onSearcherCreation(String source, SearcherScope scope) {}
10001001

10011002
// Allows subclasses to wrap the DirectoryReader before it is used to create Searchers
1002-
protected DirectoryReader wrapDirectoryReader(DirectoryReader reader) throws IOException {
1003+
protected DirectoryReader wrapDirectoryReader(DirectoryReader reader, SplitShardCountSummary ignored) throws IOException {
10031004
return reader;
10041005
}
10051006

10061007
/**
10071008
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
10081009
*/
10091010
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
1011+
return acquireSearcherSupplier(wrapper, scope, SplitShardCountSummary.UNSET);
1012+
}
1013+
1014+
public SearcherSupplier acquireSearcherSupplier(
1015+
Function<Searcher, Searcher> wrapper,
1016+
SearcherScope scope,
1017+
SplitShardCountSummary splitShardCountSummary
1018+
) throws EngineException {
10101019
/* Acquire order here is store -> manager since we need
10111020
* to make sure that the store is not closed before
10121021
* the searcher is acquired. */
@@ -1017,7 +1026,7 @@ public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wra
10171026
try {
10181027
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
10191028
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
1020-
DirectoryReader wrappedDirectoryReader = wrapDirectoryReader(acquire);
1029+
DirectoryReader wrappedDirectoryReader = wrapDirectoryReader(acquire, splitShardCountSummary);
10211030
SearcherSupplier reader = new SearcherSupplier(wrapper) {
10221031
@Override
10231032
public Searcher acquireSearcherInternal(String source) {
@@ -1070,9 +1079,18 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
10701079
}
10711080

10721081
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
1082+
return acquireSearcher(source, scope, SplitShardCountSummary.UNSET, wrapper);
1083+
}
1084+
1085+
public Searcher acquireSearcher(
1086+
String source,
1087+
SearcherScope scope,
1088+
SplitShardCountSummary splitShardCountSummary,
1089+
Function<Searcher, Searcher> wrapper
1090+
) throws EngineException {
10731091
SearcherSupplier releasable = null;
10741092
try {
1075-
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope);
1093+
SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope, splitShardCountSummary);
10761094
Searcher searcher = reader.acquireSearcher(source);
10771095
releasable = null;
10781096
onSearcherCreation(source, scope);

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.cluster.routing.RecoverySource;
4545
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
4646
import org.elasticsearch.cluster.routing.ShardRouting;
47+
import org.elasticsearch.cluster.routing.SplitShardCountSummary;
4748
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
4849
import org.elasticsearch.cluster.service.ClusterApplierService;
4950
import org.elasticsearch.cluster.service.MasterService;
@@ -1712,14 +1713,22 @@ public Engine.SearcherSupplier acquireSearcherSupplier() {
17121713
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL);
17131714
}
17141715

1716+
public Engine.SearcherSupplier acquireSearcherSupplier(SplitShardCountSummary splitShardCountSummary) {
1717+
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL, splitShardCountSummary);
1718+
}
1719+
17151720
/**
17161721
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
17171722
*/
17181723
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
1724+
return acquireSearcherSupplier(scope, SplitShardCountSummary.UNSET);
1725+
}
1726+
1727+
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope, SplitShardCountSummary splitShardCountSummary) {
17191728
readAllowed();
17201729
markSearcherAccessed();
17211730
final Engine engine = getEngine();
1722-
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
1731+
return engine.acquireSearcherSupplier(this::wrapSearcher, scope, splitShardCountSummary);
17231732
}
17241733

17251734
public Engine.Searcher acquireSearcher(String source) {

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,7 +1272,7 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) {
12721272
// calculated from the ids of the underlying segments of an index commit
12731273
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
12741274
final IndexShard shard = indexService.getShard(request.shardId().id());
1275-
final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier();
1275+
final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(request.getSplitShardCountSummary());
12761276
if (contextId.sameSearcherIdsAs(searcherSupplier.getSearcherId()) == false) {
12771277
searcherSupplier.close();
12781278
throw e;
@@ -1296,7 +1296,13 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) {
12961296
}
12971297
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
12981298
final IndexShard shard = indexService.getShard(request.shardId().id());
1299-
return createAndPutReaderContext(request, indexService, shard, shard.acquireSearcherSupplier(), keepAliveInMillis);
1299+
return createAndPutReaderContext(
1300+
request,
1301+
indexService,
1302+
shard,
1303+
shard.acquireSearcherSupplier(request.getSplitShardCountSummary()),
1304+
keepAliveInMillis
1305+
);
13001306
}
13011307

13021308
final ReaderContext createAndPutReaderContext(

server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public class ShardSearchRequest extends AbstractTransportRequest implements Indi
105105
/**
106106
* Additional metadata specific to the resharding feature. See {@link org.elasticsearch.cluster.routing.SplitShardCountSummary}.
107107
*/
108-
private final SplitShardCountSummary reshardSplitShardCountSummary;
108+
private final SplitShardCountSummary splitShardCountSummary;
109109

110110
public static final TransportVersion SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY = TransportVersion.fromName(
111111
"shard_search_request_reshard_shard_count_summary"
@@ -151,7 +151,7 @@ public ShardSearchRequest(
151151
@Nullable String clusterAlias,
152152
ShardSearchContextId readerId,
153153
TimeValue keepAlive,
154-
SplitShardCountSummary reshardSplitShardCountSummary
154+
SplitShardCountSummary splitShardCountSummary
155155
) {
156156
this(
157157
originalIndices,
@@ -172,7 +172,7 @@ public ShardSearchRequest(
172172
computeWaitForCheckpoint(searchRequest.getWaitForCheckpoints(), shardId, shardRequestIndex),
173173
searchRequest.getWaitForCheckpointsTimeout(),
174174
searchRequest.isForceSyntheticSource(),
175-
reshardSplitShardCountSummary
175+
splitShardCountSummary
176176
);
177177
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
178178
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
@@ -206,7 +206,7 @@ public ShardSearchRequest(
206206
long nowInMillis,
207207
AliasFilter aliasFilter,
208208
String clusterAlias,
209-
SplitShardCountSummary reshardSplitShardCountSummary
209+
SplitShardCountSummary splitShardCountSummary
210210
) {
211211
this(
212212
OriginalIndices.NONE,
@@ -227,7 +227,7 @@ public ShardSearchRequest(
227227
SequenceNumbers.UNASSIGNED_SEQ_NO,
228228
SearchService.NO_TIMEOUT,
229229
false,
230-
reshardSplitShardCountSummary
230+
splitShardCountSummary
231231
);
232232
}
233233

@@ -251,7 +251,7 @@ public ShardSearchRequest(
251251
long waitForCheckpoint,
252252
TimeValue waitForCheckpointsTimeout,
253253
boolean forceSyntheticSource,
254-
SplitShardCountSummary reshardSplitShardCountSummary
254+
SplitShardCountSummary splitShardCountSummary
255255
) {
256256
this.shardId = shardId;
257257
this.shardRequestIndex = shardRequestIndex;
@@ -273,7 +273,7 @@ public ShardSearchRequest(
273273
this.waitForCheckpoint = waitForCheckpoint;
274274
this.waitForCheckpointsTimeout = waitForCheckpointsTimeout;
275275
this.forceSyntheticSource = forceSyntheticSource;
276-
this.reshardSplitShardCountSummary = reshardSplitShardCountSummary;
276+
this.splitShardCountSummary = splitShardCountSummary;
277277
}
278278

279279
@SuppressWarnings("this-escape")
@@ -299,7 +299,7 @@ public ShardSearchRequest(ShardSearchRequest clone) {
299299
this.waitForCheckpoint = clone.waitForCheckpoint;
300300
this.waitForCheckpointsTimeout = clone.waitForCheckpointsTimeout;
301301
this.forceSyntheticSource = clone.forceSyntheticSource;
302-
this.reshardSplitShardCountSummary = clone.reshardSplitShardCountSummary;
302+
this.splitShardCountSummary = clone.splitShardCountSummary;
303303
}
304304

305305
public ShardSearchRequest(StreamInput in) throws IOException {
@@ -367,9 +367,9 @@ public ShardSearchRequest(StreamInput in) throws IOException {
367367
forceSyntheticSource = false;
368368
}
369369
if (in.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
370-
reshardSplitShardCountSummary = new SplitShardCountSummary(in);
370+
splitShardCountSummary = new SplitShardCountSummary(in);
371371
} else {
372-
reshardSplitShardCountSummary = SplitShardCountSummary.UNSET;
372+
splitShardCountSummary = SplitShardCountSummary.UNSET;
373373
}
374374

375375
originalIndices = OriginalIndices.readOriginalIndices(in);
@@ -433,7 +433,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce
433433
}
434434
}
435435
if (out.getTransportVersion().supports(SHARD_SEARCH_REQUEST_RESHARD_SHARD_COUNT_SUMMARY)) {
436-
reshardSplitShardCountSummary.writeTo(out);
436+
splitShardCountSummary.writeTo(out);
437437
}
438438
}
439439

@@ -592,6 +592,10 @@ public String getClusterAlias() {
592592
return clusterAlias;
593593
}
594594

595+
public SplitShardCountSummary getSplitShardCountSummary() {
596+
return splitShardCountSummary;
597+
}
598+
595599
@Override
596600
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
597601
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);

server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void testEqualsAndHashcode() {
125125
s.getSearchContextKeepAlive(),
126126
s.prefiltered(),
127127
s.skip(),
128-
s.getReshardSplitShardCountSummary()
128+
s.getSplitShardCountSummary()
129129
),
130130
s -> {
131131
if (randomBoolean()) {
@@ -144,7 +144,7 @@ public void testEqualsAndHashcode() {
144144
s.getSearchContextKeepAlive(),
145145
s.prefiltered(),
146146
s.skip(),
147-
s.getReshardSplitShardCountSummary()
147+
s.getSplitShardCountSummary()
148148
);
149149
} else {
150150
ShardId shardId = new ShardId(
@@ -161,7 +161,7 @@ public void testEqualsAndHashcode() {
161161
s.getSearchContextKeepAlive(),
162162
s.prefiltered(),
163163
s.skip(),
164-
s.getReshardSplitShardCountSummary()
164+
s.getSplitShardCountSummary()
165165
);
166166
}
167167
}
@@ -233,7 +233,7 @@ public void testCompareToEqualItems() {
233233
shardIterator1.getSearchContextKeepAlive(),
234234
shardIterator1.prefiltered(),
235235
shardIterator1.skip(),
236-
shardIterator1.getReshardSplitShardCountSummary()
236+
shardIterator1.getSplitShardCountSummary()
237237
);
238238
assertEquals(shardIterator1, shardIterator2);
239239
assertEquals(0, shardIterator1.compareTo(shardIterator2));

0 commit comments

Comments
 (0)