Skip to content

Commit 293dc09

Browse files
authored
Increase concurrent request of opening point-in-time (#96782) (#96957)
Today, we mistakenly throttle the opening point-in-time API to 1 request per node. As a result, when attempting to open a point-in-time across large clusters, it can take a significant amount of time and eventually fails due to relocated target shards or deleted target indices managed by ILM. Ideally, we should batch the requests per node and eliminate this throttle completely. However, this requires all clusters to be on the latest version. This PR increases the number of concurrent requests from 1 to 5, which is the default of search.
1 parent a6d8f05 commit 293dc09

File tree

7 files changed

+157
-3
lines changed

7 files changed

+157
-3
lines changed

docs/changelog/96782.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96782
2+
summary: Increase concurrent request of opening point-in-time
3+
area: Search
4+
type: bug
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,20 @@
1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.action.admin.indices.stats.CommonStats;
1414
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.action.support.PlainActionFuture;
1516
import org.elasticsearch.cluster.metadata.IndexMetadata;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.routing.ShardRouting;
1819
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.util.CollectionUtils;
1921
import org.elasticsearch.core.TimeValue;
2022
import org.elasticsearch.index.IndexService;
2123
import org.elasticsearch.index.IndexSettings;
2224
import org.elasticsearch.index.query.MatchAllQueryBuilder;
2325
import org.elasticsearch.index.query.RangeQueryBuilder;
2426
import org.elasticsearch.index.shard.IndexShard;
2527
import org.elasticsearch.indices.IndicesService;
28+
import org.elasticsearch.plugins.Plugin;
2629
import org.elasticsearch.rest.RestStatus;
2730
import org.elasticsearch.search.SearchContextMissingException;
2831
import org.elasticsearch.search.SearchHit;
@@ -33,10 +36,14 @@
3336
import org.elasticsearch.search.sort.SortOrder;
3437
import org.elasticsearch.tasks.TaskInfo;
3538
import org.elasticsearch.test.ESIntegTestCase;
39+
import org.elasticsearch.test.transport.MockTransportService;
40+
import org.elasticsearch.transport.TransportService;
3641

42+
import java.util.Collection;
3743
import java.util.HashSet;
3844
import java.util.List;
3945
import java.util.Set;
46+
import java.util.concurrent.CountDownLatch;
4047
import java.util.concurrent.TimeUnit;
4148
import java.util.stream.Collectors;
4249

@@ -54,6 +61,11 @@
5461

5562
public class PointInTimeIT extends ESIntegTestCase {
5663

64+
@Override
65+
protected Collection<Class<? extends Plugin>> nodePlugins() {
66+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
67+
}
68+
5769
@Override
5870
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5971
return Settings.builder()
@@ -430,6 +442,52 @@ public void testCloseInvalidPointInTime() {
430442
assertThat(tasks, empty());
431443
}
432444

445+
public void testOpenPITConcurrentShardRequests() throws Exception {
446+
DiscoveryNode dataNode = randomFrom(clusterService().state().nodes().getDataNodes().values());
447+
int numShards = randomIntBetween(5, 10);
448+
int maxConcurrentRequests = randomIntBetween(2, 5);
449+
assertAcked(
450+
client().admin()
451+
.indices()
452+
.prepareCreate("test")
453+
.setSettings(
454+
Settings.builder()
455+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
456+
.put("index.routing.allocation.require._id", dataNode.getId())
457+
.build()
458+
)
459+
);
460+
var transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getName());
461+
try {
462+
CountDownLatch sentLatch = new CountDownLatch(maxConcurrentRequests);
463+
CountDownLatch readyLatch = new CountDownLatch(1);
464+
transportService.addRequestHandlingBehavior(
465+
TransportOpenPointInTimeAction.OPEN_SHARD_READER_CONTEXT_NAME,
466+
(handler, request, channel, task) -> {
467+
sentLatch.countDown();
468+
Thread thread = new Thread(() -> {
469+
try {
470+
assertTrue(readyLatch.await(1, TimeUnit.MINUTES));
471+
handler.messageReceived(request, channel, task);
472+
} catch (Exception e) {
473+
throw new AssertionError(e);
474+
}
475+
});
476+
thread.start();
477+
}
478+
);
479+
OpenPointInTimeRequest request = new OpenPointInTimeRequest("test").keepAlive(TimeValue.timeValueMinutes(1));
480+
request.maxConcurrentShardRequests(maxConcurrentRequests);
481+
PlainActionFuture<OpenPointInTimeResponse> future = new PlainActionFuture<>();
482+
client().execute(OpenPointInTimeAction.INSTANCE, request, future);
483+
assertTrue(sentLatch.await(1, TimeUnit.MINUTES));
484+
readyLatch.countDown();
485+
closePointInTime(future.actionGet().getPointInTimeId());
486+
} finally {
487+
transportService.clearAllRules();
488+
}
489+
}
490+
433491
@SuppressWarnings({ "rawtypes", "unchecked" })
434492
private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder<?>... sorts) throws Exception {
435493
Set<String> seen = new HashSet<>();

server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@
2626
import static org.elasticsearch.action.ValidateActions.addValidationError;
2727

2828
public final class OpenPointInTimeRequest extends ActionRequest implements IndicesRequest.Replaceable {
29+
2930
private String[] indices;
3031
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
3132
private TimeValue keepAlive;
32-
33+
private int maxConcurrentShardRequests = SearchRequest.DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS;
3334
@Nullable
3435
private String routing;
3536
@Nullable
@@ -123,6 +124,27 @@ public OpenPointInTimeRequest preference(String preference) {
123124
return this;
124125
}
125126

127+
/**
128+
* Similar to {@link SearchRequest#getMaxConcurrentShardRequests()}, this returns the number of shard requests that should be
129+
* executed concurrently on a single node . This value should be used as a protection mechanism to reduce the number of shard
130+
* requests fired per open point-in-time request. The default is {@code 5}
131+
*/
132+
public int maxConcurrentShardRequests() {
133+
return maxConcurrentShardRequests;
134+
}
135+
136+
/**
137+
* Similar to {@link SearchRequest#setMaxConcurrentShardRequests(int)}, this sets the number of shard requests that should be
138+
* executed concurrently on a single node. This value should be used as a protection mechanism to reduce the number of shard
139+
* requests fired per open point-in-time request.
140+
*/
141+
public void maxConcurrentShardRequests(int maxConcurrentShardRequests) {
142+
if (maxConcurrentShardRequests < 1) {
143+
throw new IllegalArgumentException("maxConcurrentShardRequests must be >= 1");
144+
}
145+
this.maxConcurrentShardRequests = maxConcurrentShardRequests;
146+
}
147+
126148
@Override
127149
public boolean allowsRemoteIndices() {
128150
return true;

server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
4444
openRequest.routing(request.param("routing"));
4545
openRequest.preference(request.param("preference"));
4646
openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive"));
47+
if (request.hasParam("max_concurrent_shard_requests")) {
48+
final int maxConcurrentShardRequests = request.paramAsInt(
49+
"max_concurrent_shard_requests",
50+
openRequest.maxConcurrentShardRequests()
51+
);
52+
openRequest.maxConcurrentShardRequests(maxConcurrentShardRequests);
53+
}
4754
return channel -> client.execute(OpenPointInTimeAction.INSTANCE, openRequest, new RestToXContentListener<>(channel));
4855
}
4956
}

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
8787
private int batchedReduceSize = DEFAULT_BATCHED_REDUCE_SIZE;
8888

8989
private int maxConcurrentShardRequests = 0;
90+
public static final int DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS = 5;
9091

9192
private Integer preFilterShardSize;
9293

@@ -713,7 +714,7 @@ public int getBatchedReduceSize() {
713714
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}
714715
*/
715716
public int getMaxConcurrentShardRequests() {
716-
return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests;
717+
return maxConcurrentShardRequests == 0 ? DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS : maxConcurrentShardRequests;
717718
}
718719

719720
/**

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
8686
.preference(request.preference())
8787
.routing(request.routing())
8888
.allowPartialSearchResults(false);
89+
searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests());
8990
searchRequest.setCcsMinimizeRoundtrips(false);
9091
transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> {
9192
assert r.pointInTimeId() != null : r;
@@ -117,6 +118,8 @@ public SearchPhase newSearchPhase(
117118
ThreadPool threadPool,
118119
SearchResponse.Clusters clusters
119120
) {
121+
assert searchRequest.getMaxConcurrentShardRequests() == pitRequest.maxConcurrentShardRequests()
122+
: searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests();
120123
return new AbstractSearchAsyncAction<>(
121124
actionName,
122125
logger,
@@ -132,7 +135,7 @@ public SearchPhase newSearchPhase(
132135
clusterState,
133136
task,
134137
new ArraySearchPhaseResults<>(shardIterators.size()),
135-
1,
138+
searchRequest.getMaxConcurrentShardRequests(),
136139
clusters
137140
) {
138141
@Override
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.search;
10+
11+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
12+
import org.elasticsearch.rest.RestRequest;
13+
import org.elasticsearch.test.rest.FakeRestRequest;
14+
import org.elasticsearch.test.rest.RestActionTestCase;
15+
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
import java.util.Queue;
19+
20+
import static org.hamcrest.Matchers.equalTo;
21+
import static org.hamcrest.Matchers.hasSize;
22+
import static org.hamcrest.Matchers.instanceOf;
23+
24+
public class RestOpenPointInTimeActionTests extends RestActionTestCase {
25+
26+
public void testMaxConcurrentSearchRequests() {
27+
RestOpenPointInTimeAction action = new RestOpenPointInTimeAction();
28+
controller().registerHandler(action);
29+
Queue<OpenPointInTimeRequest> transportRequests = ConcurrentCollections.newQueue();
30+
verifyingClient.setExecuteVerifier(((actionType, transportRequest) -> {
31+
assertThat(transportRequest, instanceOf(OpenPointInTimeRequest.class));
32+
transportRequests.add((OpenPointInTimeRequest) transportRequest);
33+
return new OpenPointInTimeResponse("n/a");
34+
}));
35+
{
36+
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
37+
.withPath("/_pit")
38+
.build();
39+
dispatchRequest(request);
40+
assertThat(transportRequests, hasSize(1));
41+
OpenPointInTimeRequest transportRequest = transportRequests.remove();
42+
assertThat(transportRequest.maxConcurrentShardRequests(), equalTo(5));
43+
}
44+
{
45+
int maxConcurrentRequests = randomIntBetween(1, 100);
46+
Map<String, String> params = new HashMap<>();
47+
params.put("max_concurrent_shard_requests", Integer.toString(maxConcurrentRequests));
48+
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
49+
.withPath("/_pit")
50+
.withParams(params)
51+
.build();
52+
dispatchRequest(request);
53+
assertThat(transportRequests, hasSize(1));
54+
OpenPointInTimeRequest transportRequest = transportRequests.remove();
55+
assertThat(transportRequest.maxConcurrentShardRequests(), equalTo(maxConcurrentRequests));
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)