Skip to content

Commit a8fbd24

Browse files
authored
Increase concurrent request of opening point-in-time (#96782)
Today, we mistakenly throttle the opening point-in-time API to 1 request per node. As a result, when attempting to open a point-in-time across large clusters, it can take a significant amount of time and eventually fails due to relocated target shards or deleted target indices managed by ILM. Ideally, we should batch the requests per node and eliminate this throttle completely. However, this requires all clusters to be on the latest version. ~This PR increases the number of concurrent requests from 1 to 20. This default is higher than search, which is 5, because opening point-in-time is a lightweight operation, doesn't perform any I/O, and is executed directly on the network threads.~ This PR increases the number of concurrent requests from 1 to 5, which is the default of search. Any suggestion are welcome.
1 parent 6cf467f commit a8fbd24

File tree

9 files changed

+316
-4
lines changed

9 files changed

+316
-4
lines changed

docs/changelog/96782.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96782
2+
summary: Increase concurrent request of opening point-in-time
3+
area: Search
4+
type: bug
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/action/search/PointInTimeIT.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,20 @@
1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.action.admin.indices.stats.CommonStats;
1414
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.action.support.PlainActionFuture;
1516
import org.elasticsearch.cluster.metadata.IndexMetadata;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.routing.ShardRouting;
1819
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.util.CollectionUtils;
1921
import org.elasticsearch.core.TimeValue;
2022
import org.elasticsearch.index.IndexService;
2123
import org.elasticsearch.index.IndexSettings;
2224
import org.elasticsearch.index.query.MatchAllQueryBuilder;
2325
import org.elasticsearch.index.query.RangeQueryBuilder;
2426
import org.elasticsearch.index.shard.IndexShard;
2527
import org.elasticsearch.indices.IndicesService;
28+
import org.elasticsearch.plugins.Plugin;
2629
import org.elasticsearch.rest.RestStatus;
2730
import org.elasticsearch.search.SearchContextMissingException;
2831
import org.elasticsearch.search.SearchHit;
@@ -33,10 +36,14 @@
3336
import org.elasticsearch.search.sort.SortOrder;
3437
import org.elasticsearch.tasks.TaskInfo;
3538
import org.elasticsearch.test.ESIntegTestCase;
39+
import org.elasticsearch.test.transport.MockTransportService;
40+
import org.elasticsearch.transport.TransportService;
3641

42+
import java.util.Collection;
3743
import java.util.HashSet;
3844
import java.util.List;
3945
import java.util.Set;
46+
import java.util.concurrent.CountDownLatch;
4047
import java.util.concurrent.TimeUnit;
4148
import java.util.stream.Collectors;
4249

@@ -54,6 +61,11 @@
5461

5562
public class PointInTimeIT extends ESIntegTestCase {
5663

64+
@Override
65+
protected Collection<Class<? extends Plugin>> nodePlugins() {
66+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
67+
}
68+
5769
@Override
5870
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5971
return Settings.builder()
@@ -430,6 +442,52 @@ public void testCloseInvalidPointInTime() {
430442
assertThat(tasks, empty());
431443
}
432444

445+
public void testOpenPITConcurrentShardRequests() throws Exception {
446+
DiscoveryNode dataNode = randomFrom(clusterService().state().nodes().getDataNodes().values());
447+
int numShards = randomIntBetween(5, 10);
448+
int maxConcurrentRequests = randomIntBetween(2, 5);
449+
assertAcked(
450+
client().admin()
451+
.indices()
452+
.prepareCreate("test")
453+
.setSettings(
454+
Settings.builder()
455+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
456+
.put("index.routing.allocation.require._id", dataNode.getId())
457+
.build()
458+
)
459+
);
460+
var transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getName());
461+
try {
462+
CountDownLatch sentLatch = new CountDownLatch(maxConcurrentRequests);
463+
CountDownLatch readyLatch = new CountDownLatch(1);
464+
transportService.addRequestHandlingBehavior(
465+
TransportOpenPointInTimeAction.OPEN_SHARD_READER_CONTEXT_NAME,
466+
(handler, request, channel, task) -> {
467+
sentLatch.countDown();
468+
Thread thread = new Thread(() -> {
469+
try {
470+
assertTrue(readyLatch.await(1, TimeUnit.MINUTES));
471+
handler.messageReceived(request, channel, task);
472+
} catch (Exception e) {
473+
throw new AssertionError(e);
474+
}
475+
});
476+
thread.start();
477+
}
478+
);
479+
OpenPointInTimeRequest request = new OpenPointInTimeRequest("test").keepAlive(TimeValue.timeValueMinutes(1));
480+
request.maxConcurrentShardRequests(maxConcurrentRequests);
481+
PlainActionFuture<OpenPointInTimeResponse> future = new PlainActionFuture<>();
482+
client().execute(OpenPointInTimeAction.INSTANCE, request, future);
483+
assertTrue(sentLatch.await(1, TimeUnit.MINUTES));
484+
readyLatch.countDown();
485+
closePointInTime(future.actionGet().getPointInTimeId());
486+
} finally {
487+
transportService.clearAllRules();
488+
}
489+
}
490+
433491
@SuppressWarnings({ "rawtypes", "unchecked" })
434492
private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder<?>... sorts) throws Exception {
435493
Set<String> seen = new HashSet<>();

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,10 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
141141
public static final TransportVersion V_8_500_014 = registerTransportVersion(8_500_014, "D115A2E1-1739-4A02-AB7B-64F6EA157EFB");
142142
public static final TransportVersion V_8_500_015 = registerTransportVersion(8_500_015, "651216c9-d54f-4189-9fe1-48d82d276863");
143143
public static final TransportVersion V_8_500_016 = registerTransportVersion(8_500_016, "492C94FB-AAEA-4C9E-8375-BDB67A398584");
144+
public static final TransportVersion V_8_500_017 = registerTransportVersion(8_500_017, "0EDCB5BA-049C-443C-8AB1-5FA58FB996FB");
144145

145146
private static class CurrentHolder {
146-
private static final TransportVersion CURRENT = findCurrent(V_8_500_016);
147+
private static final TransportVersion CURRENT = findCurrent(V_8_500_017);
147148

148149
// finds the pluggable current version, or uses the given fallback
149150
private static TransportVersion findCurrent(TransportVersion fallback) {

server/src/main/java/org/elasticsearch/action/search/OpenPointInTimeRequest.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.action.search;
1010

11+
import org.elasticsearch.TransportVersion;
1112
import org.elasticsearch.action.ActionRequest;
1213
import org.elasticsearch.action.ActionRequestValidationException;
1314
import org.elasticsearch.action.IndicesRequest;
@@ -20,16 +21,18 @@
2021
import org.elasticsearch.tasks.TaskId;
2122

2223
import java.io.IOException;
24+
import java.util.Arrays;
2325
import java.util.Map;
2426
import java.util.Objects;
2527

2628
import static org.elasticsearch.action.ValidateActions.addValidationError;
2729

2830
public final class OpenPointInTimeRequest extends ActionRequest implements IndicesRequest.Replaceable {
31+
2932
private String[] indices;
3033
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
3134
private TimeValue keepAlive;
32-
35+
private int maxConcurrentShardRequests = SearchRequest.DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS;
3336
@Nullable
3437
private String routing;
3538
@Nullable
@@ -48,6 +51,9 @@ public OpenPointInTimeRequest(StreamInput in) throws IOException {
4851
this.keepAlive = in.readTimeValue();
4952
this.routing = in.readOptionalString();
5053
this.preference = in.readOptionalString();
54+
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_500_017)) {
55+
this.maxConcurrentShardRequests = in.readVInt();
56+
}
5157
}
5258

5359
@Override
@@ -58,6 +64,9 @@ public void writeTo(StreamOutput out) throws IOException {
5864
out.writeTimeValue(keepAlive);
5965
out.writeOptionalString(routing);
6066
out.writeOptionalString(preference);
67+
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_500_017)) {
68+
out.writeVInt(maxConcurrentShardRequests);
69+
}
6170
}
6271

6372
@Override
@@ -123,6 +132,27 @@ public OpenPointInTimeRequest preference(String preference) {
123132
return this;
124133
}
125134

135+
/**
136+
* Similar to {@link SearchRequest#getMaxConcurrentShardRequests()}, this returns the number of shard requests that should be
137+
* executed concurrently on a single node . This value should be used as a protection mechanism to reduce the number of shard
138+
* requests fired per open point-in-time request. The default is {@code 5}
139+
*/
140+
public int maxConcurrentShardRequests() {
141+
return maxConcurrentShardRequests;
142+
}
143+
144+
/**
145+
* Similar to {@link SearchRequest#setMaxConcurrentShardRequests(int)}, this sets the number of shard requests that should be
146+
* executed concurrently on a single node. This value should be used as a protection mechanism to reduce the number of shard
147+
* requests fired per open point-in-time request.
148+
*/
149+
public void maxConcurrentShardRequests(int maxConcurrentShardRequests) {
150+
if (maxConcurrentShardRequests < 1) {
151+
throw new IllegalArgumentException("maxConcurrentShardRequests must be >= 1");
152+
}
153+
this.maxConcurrentShardRequests = maxConcurrentShardRequests;
154+
}
155+
126156
@Override
127157
public boolean allowsRemoteIndices() {
128158
return true;
@@ -138,8 +168,46 @@ public String getDescription() {
138168
return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]";
139169
}
140170

171+
@Override
172+
public String toString() {
173+
return "OpenPointInTimeRequest{"
174+
+ "indices="
175+
+ Arrays.toString(indices)
176+
+ ", keepAlive="
177+
+ keepAlive
178+
+ ", maxConcurrentShardRequests="
179+
+ maxConcurrentShardRequests
180+
+ ", routing='"
181+
+ routing
182+
+ '\''
183+
+ ", preference='"
184+
+ preference
185+
+ '\''
186+
+ '}';
187+
}
188+
141189
@Override
142190
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
143191
return new SearchTask(id, type, action, this::getDescription, parentTaskId, headers);
144192
}
193+
194+
@Override
195+
public boolean equals(Object o) {
196+
if (this == o) return true;
197+
if (o == null || getClass() != o.getClass()) return false;
198+
OpenPointInTimeRequest that = (OpenPointInTimeRequest) o;
199+
return maxConcurrentShardRequests == that.maxConcurrentShardRequests
200+
&& Arrays.equals(indices, that.indices)
201+
&& indicesOptions.equals(that.indicesOptions)
202+
&& keepAlive.equals(that.keepAlive)
203+
&& Objects.equals(routing, that.routing)
204+
&& Objects.equals(preference, that.preference);
205+
}
206+
207+
@Override
208+
public int hashCode() {
209+
int result = Objects.hash(indicesOptions, keepAlive, maxConcurrentShardRequests, routing, preference);
210+
result = 31 * result + Arrays.hashCode(indices);
211+
return result;
212+
}
145213
}

server/src/main/java/org/elasticsearch/action/search/RestOpenPointInTimeAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
4444
openRequest.routing(request.param("routing"));
4545
openRequest.preference(request.param("preference"));
4646
openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive"));
47+
if (request.hasParam("max_concurrent_shard_requests")) {
48+
final int maxConcurrentShardRequests = request.paramAsInt(
49+
"max_concurrent_shard_requests",
50+
openRequest.maxConcurrentShardRequests()
51+
);
52+
openRequest.maxConcurrentShardRequests(maxConcurrentShardRequests);
53+
}
4754
return channel -> client.execute(OpenPointInTimeAction.INSTANCE, openRequest, new RestToXContentListener<>(channel));
4855
}
4956
}

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
8787
private int batchedReduceSize = DEFAULT_BATCHED_REDUCE_SIZE;
8888

8989
private int maxConcurrentShardRequests = 0;
90+
public static final int DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS = 5;
9091

9192
private Integer preFilterShardSize;
9293

@@ -716,7 +717,7 @@ public int getBatchedReduceSize() {
716717
* cluster can be throttled with this number to reduce the cluster load. The default is {@code 5}
717718
*/
718719
public int getMaxConcurrentShardRequests() {
719-
return maxConcurrentShardRequests == 0 ? 5 : maxConcurrentShardRequests;
720+
return maxConcurrentShardRequests == 0 ? DEFAULT_MAX_CONCURRENT_SHARD_REQUESTS : maxConcurrentShardRequests;
720721
}
721722

722723
/**

server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
8686
.preference(request.preference())
8787
.routing(request.routing())
8888
.allowPartialSearchResults(false);
89+
searchRequest.setMaxConcurrentShardRequests(request.maxConcurrentShardRequests());
8990
searchRequest.setCcsMinimizeRoundtrips(false);
9091
transportSearchAction.executeRequest((SearchTask) task, searchRequest, listener.map(r -> {
9192
assert r.pointInTimeId() != null : r;
@@ -117,6 +118,8 @@ public SearchPhase newSearchPhase(
117118
ThreadPool threadPool,
118119
SearchResponse.Clusters clusters
119120
) {
121+
assert searchRequest.getMaxConcurrentShardRequests() == pitRequest.maxConcurrentShardRequests()
122+
: searchRequest.getMaxConcurrentShardRequests() + " != " + pitRequest.maxConcurrentShardRequests();
120123
return new AbstractSearchAsyncAction<>(
121124
actionName,
122125
logger,
@@ -132,7 +135,7 @@ public SearchPhase newSearchPhase(
132135
clusterState,
133136
task,
134137
new ArraySearchPhaseResults<>(shardIterators.size()),
135-
1,
138+
searchRequest.getMaxConcurrentShardRequests(),
136139
clusters
137140
) {
138141
@Override

0 commit comments

Comments
 (0)