Skip to content

Commit a8961e9

Browse files
Add a user-configurable timeout parameter to the _resolve/cluster API (#120542) (#121142)
Previously, should a remote cluster be unresponsive, _resolve/cluster would wait until Netty stepped in and terminated the connection. We initially responded to this issue by switching the disconnect strategy. However, this was problematic because it defeated the whole purpose of this API call—re-establish connection if and when possible. We now attempt to respond to it by adding a user-configurable GET parameter. This PR also reverses the problematic disconnect strategy. Example: ``` GET _resolve/cluster/*:*?timeout=5s ``` (cherry picked from commit d27a8e0)
1 parent 04cdac2 commit a8961e9

File tree

6 files changed

+161
-9
lines changed

6 files changed

+161
-9
lines changed

docs/changelog/120542.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 120542
2+
summary: "Feat: add a user-configurable timeout parameter to the `_resolve/cluster`\
3+
\ API"
4+
area: Search
5+
type: enhancement
6+
issues: []
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.indices.cluster;
11+
12+
import org.elasticsearch.action.admin.indices.resolve.ResolveClusterActionRequest;
13+
import org.elasticsearch.action.admin.indices.resolve.ResolveClusterActionResponse;
14+
import org.elasticsearch.action.admin.indices.resolve.ResolveClusterInfo;
15+
import org.elasticsearch.action.admin.indices.resolve.TransportResolveClusterAction;
16+
import org.elasticsearch.action.support.IndicesOptions;
17+
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
19+
import org.elasticsearch.test.transport.MockTransportService;
20+
import org.elasticsearch.transport.TransportService;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.CountDownLatch;
25+
26+
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.is;
28+
import static org.hamcrest.Matchers.nullValue;
29+
30+
public class ResolveClusterTimeoutIT extends AbstractMultiClustersTestCase {
31+
private static final String REMOTE_CLUSTER_1 = "cluster-a";
32+
33+
@Override
34+
protected List<String> remoteClusterAlias() {
35+
return List.of(REMOTE_CLUSTER_1);
36+
}
37+
38+
public void testTimeoutParameter() {
39+
long maxTimeoutInMillis = 500;
40+
41+
// First part: we query _resolve/cluster without stalling a remote.
42+
ResolveClusterActionRequest resolveClusterActionRequest;
43+
if (randomBoolean()) {
44+
resolveClusterActionRequest = new ResolveClusterActionRequest(new String[0], IndicesOptions.DEFAULT, true, true);
45+
} else {
46+
resolveClusterActionRequest = new ResolveClusterActionRequest(new String[] { "*:*" });
47+
}
48+
49+
// We set a timeout but since we don't stall any cluster, we should always get back response just fine before the timeout.
50+
resolveClusterActionRequest.setTimeout(TimeValue.timeValueSeconds(10));
51+
ResolveClusterActionResponse clusterActionResponse = safeGet(
52+
client().execute(TransportResolveClusterAction.TYPE, resolveClusterActionRequest)
53+
);
54+
Map<String, ResolveClusterInfo> clusterInfo = clusterActionResponse.getResolveClusterInfo();
55+
56+
// Remote is connected and error message is null.
57+
assertThat(clusterInfo.get(REMOTE_CLUSTER_1).isConnected(), equalTo(true));
58+
assertThat(clusterInfo.get(REMOTE_CLUSTER_1).getError(), is(nullValue()));
59+
60+
// Second part: now we stall the remote and utilise the timeout feature.
61+
CountDownLatch latch = new CountDownLatch(1);
62+
63+
// Add an override so that the remote cluster receives the TransportResolveClusterAction request but stalls.
64+
for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) {
65+
((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior(
66+
TransportResolveClusterAction.REMOTE_TYPE.name(),
67+
(requestHandler, transportRequest, transportChannel, transportTask) -> {
68+
// Wait until the TransportResolveRequestAction times out following which the latch is released.
69+
latch.await();
70+
requestHandler.messageReceived(transportRequest, transportChannel, transportTask);
71+
}
72+
);
73+
}
74+
75+
long randomlyChosenTimeout = randomLongBetween(100, maxTimeoutInMillis);
76+
// We now randomly choose a timeout which is guaranteed to hit since the remote is stalled.
77+
resolveClusterActionRequest.setTimeout(TimeValue.timeValueMillis(randomlyChosenTimeout));
78+
79+
clusterActionResponse = safeGet(client().execute(TransportResolveClusterAction.TYPE, resolveClusterActionRequest));
80+
latch.countDown();
81+
82+
clusterInfo = clusterActionResponse.getResolveClusterInfo();
83+
84+
// Ensure that the request timed out and that the remote is marked as not connected.
85+
assertThat(clusterInfo.get(REMOTE_CLUSTER_1).isConnected(), equalTo(false));
86+
assertThat(
87+
clusterInfo.get(REMOTE_CLUSTER_1).getError(),
88+
equalTo("Request timed out before receiving a response from the remote cluster")
89+
);
90+
}
91+
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ static TransportVersion def(int id) {
175175
public static final TransportVersion INGEST_REQUEST_INCLUDE_SOURCE_ON_ERROR = def(8_835_00_0);
176176
public static final TransportVersion RESOURCE_DEPRECATION_CHECKS = def(8_836_00_0);
177177
public static final TransportVersion LINEAR_RETRIEVER_SUPPORT = def(8_837_00_0);
178+
public static final TransportVersion TIMEOUT_GET_PARAM_FOR_RESOLVE_CLUSTER = def(8_838_00_0);
178179

179180
/*
180181
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveClusterActionRequest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.common.Strings;
1919
import org.elasticsearch.common.io.stream.StreamInput;
2020
import org.elasticsearch.common.io.stream.StreamOutput;
21+
import org.elasticsearch.core.TimeValue;
2122
import org.elasticsearch.tasks.CancellableTask;
2223
import org.elasticsearch.tasks.Task;
2324
import org.elasticsearch.tasks.TaskId;
@@ -51,6 +52,7 @@ public class ResolveClusterActionRequest extends ActionRequest implements Indice
5152
*/
5253
private boolean localIndicesRequested = false;
5354
private IndicesOptions indicesOptions;
55+
private TimeValue timeout;
5456

5557
// true if the user did not provide any index expression - they only want cluster level info, not index matching
5658
private final boolean clusterInfoOnly;
@@ -89,6 +91,9 @@ public ResolveClusterActionRequest(StreamInput in) throws IOException {
8991
this.clusterInfoOnly = false;
9092
this.isQueryingCluster = false;
9193
}
94+
if (in.getTransportVersion().onOrAfter(TransportVersions.TIMEOUT_GET_PARAM_FOR_RESOLVE_CLUSTER)) {
95+
this.timeout = in.readOptionalTimeValue();
96+
}
9297
}
9398

9499
@Override
@@ -103,6 +108,9 @@ public void writeTo(StreamOutput out) throws IOException {
103108
out.writeBoolean(clusterInfoOnly);
104109
out.writeBoolean(isQueryingCluster);
105110
}
111+
if (out.getTransportVersion().onOrAfter(TransportVersions.TIMEOUT_GET_PARAM_FOR_RESOLVE_CLUSTER)) {
112+
out.writeOptionalTimeValue(timeout);
113+
}
106114
}
107115

108116
static String createVersionErrorMessage(TransportVersion versionFound) {
@@ -124,12 +132,14 @@ public boolean equals(Object o) {
124132
if (this == o) return true;
125133
if (o == null || getClass() != o.getClass()) return false;
126134
ResolveClusterActionRequest request = (ResolveClusterActionRequest) o;
127-
return Arrays.equals(names, request.names) && indicesOptions.equals(request.indicesOptions());
135+
return Arrays.equals(names, request.names)
136+
&& indicesOptions.equals(request.indicesOptions())
137+
&& Objects.equals(timeout, request.timeout);
128138
}
129139

130140
@Override
131141
public int hashCode() {
132-
int result = Objects.hash(indicesOptions);
142+
int result = Objects.hash(indicesOptions, timeout);
133143
result = 31 * result + Arrays.hashCode(names);
134144
return result;
135145
}
@@ -139,6 +149,10 @@ public String[] indices() {
139149
return names;
140150
}
141151

152+
public TimeValue getTimeout() {
153+
return timeout;
154+
}
155+
142156
public boolean clusterInfoOnly() {
143157
return clusterInfoOnly;
144158
}
@@ -202,6 +216,10 @@ boolean localIndicesPresent(String[] indices) {
202216
return false;
203217
}
204218

219+
public void setTimeout(TimeValue timeout) {
220+
this.timeout = timeout;
221+
}
222+
205223
@Override
206224
public String toString() {
207225
return "ResolveClusterActionRequest{"

server/src/main/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterAction.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@
2222
import org.elasticsearch.action.support.ActionFilters;
2323
import org.elasticsearch.action.support.HandledTransportAction;
2424
import org.elasticsearch.action.support.IndicesOptions;
25+
import org.elasticsearch.action.support.ListenerTimeouts;
2526
import org.elasticsearch.action.support.RefCountingRunnable;
2627
import org.elasticsearch.client.internal.RemoteClusterClient;
2728
import org.elasticsearch.cluster.ClusterState;
2829
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2930
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.util.concurrent.EsExecutors;
3132
import org.elasticsearch.core.Strings;
33+
import org.elasticsearch.core.TimeValue;
3234
import org.elasticsearch.index.IndexNotFoundException;
3335
import org.elasticsearch.injection.guice.Inject;
3436
import org.elasticsearch.search.SearchService;
3537
import org.elasticsearch.tasks.CancellableTask;
3638
import org.elasticsearch.tasks.Task;
3739
import org.elasticsearch.threadpool.ThreadPool;
40+
import org.elasticsearch.transport.ConnectTransportException;
3841
import org.elasticsearch.transport.RemoteClusterAware;
3942
import org.elasticsearch.transport.RemoteClusterService;
4043
import org.elasticsearch.transport.TransportService;
@@ -60,12 +63,14 @@ public class TransportResolveClusterAction extends HandledTransportAction<Resolv
6063
);
6164

6265
private static final String DUMMY_INDEX_FOR_OLDER_CLUSTERS = "*:dummy*";
66+
private static final String REMOTE_CONNECTION_TIMEOUT_ERROR = "Request timed out before receiving a response from the remote cluster";
6367

6468
private final Executor searchCoordinationExecutor;
6569
private final ClusterService clusterService;
6670
private final RemoteClusterService remoteClusterService;
6771
private final IndexNameExpressionResolver indexNameExpressionResolver;
6872
private final boolean ccsCheckCompatibility;
73+
private final ThreadPool threadPool;
6974

7075
@Inject
7176
public TransportResolveClusterAction(
@@ -81,6 +86,7 @@ public TransportResolveClusterAction(
8186
this.remoteClusterService = transportService.getRemoteClusterService();
8287
this.indexNameExpressionResolver = indexNameExpressionResolver;
8388
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
89+
this.threadPool = threadPool;
8490
}
8591

8692
@Override
@@ -93,6 +99,7 @@ protected void doExecuteForked(Task task, ResolveClusterActionRequest request, A
9399
if (ccsCheckCompatibility) {
94100
checkCCSVersionCompatibility(request);
95101
}
102+
96103
assert task instanceof CancellableTask;
97104
final CancellableTask resolveClusterTask = (CancellableTask) task;
98105
ClusterState clusterState = clusterService.state();
@@ -171,7 +178,7 @@ protected void doExecuteForked(Task task, ResolveClusterActionRequest request, A
171178
RemoteClusterClient remoteClusterClient = remoteClusterService.getRemoteClusterClient(
172179
clusterAlias,
173180
searchCoordinationExecutor,
174-
RemoteClusterService.DisconnectedStrategy.FAIL_IF_DISCONNECTED
181+
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
175182
);
176183
var remoteRequest = new ResolveClusterActionRequest(
177184
originalIndices.indices(),
@@ -205,7 +212,17 @@ public void onFailure(Exception failure) {
205212
return;
206213
}
207214
if (ExceptionsHelper.isRemoteUnavailableException((failure))) {
208-
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(false, skipUnavailable));
215+
String errorMessage = failure.getMessage();
216+
/*
217+
* If the request timed out, set the error field in the response we send back. This is so that we could
218+
* differentiate between connection error to a remote vs. request time out since the "connected" property
219+
* cannot provide the additional context in the latter case.
220+
*/
221+
if (errorMessage.equals(REMOTE_CONNECTION_TIMEOUT_ERROR)) {
222+
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(false, skipUnavailable, errorMessage));
223+
} else {
224+
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(false, skipUnavailable));
225+
}
209226
} else if (ExceptionsHelper.unwrap(
210227
failure,
211228
ElasticsearchSecurityException.class
@@ -326,11 +343,23 @@ public void onFailure(Exception e) {
326343
}
327344
};
328345

329-
remoteClusterClient.execute(
330-
TransportResolveClusterAction.REMOTE_TYPE,
331-
remoteRequest,
332-
ActionListener.releaseAfter(remoteListener, refs.acquire())
333-
);
346+
ActionListener<ResolveClusterActionResponse> resultsListener;
347+
TimeValue timeout = request.getTimeout();
348+
// Wrap the listener with a timeout since a timeout was specified.
349+
if (timeout != null) {
350+
var releaserListener = ActionListener.releaseAfter(remoteListener, refs.acquire());
351+
resultsListener = ListenerTimeouts.wrapWithTimeout(
352+
threadPool,
353+
timeout,
354+
searchCoordinationExecutor,
355+
releaserListener,
356+
ignored -> releaserListener.onFailure(new ConnectTransportException(null, REMOTE_CONNECTION_TIMEOUT_ERROR))
357+
);
358+
} else {
359+
resultsListener = ActionListener.releaseAfter(remoteListener, refs.acquire());
360+
}
361+
362+
remoteClusterClient.execute(TransportResolveClusterAction.REMOTE_TYPE, remoteRequest, resultsListener);
334363
}
335364
}
336365
}

server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestResolveClusterAction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.client.internal.node.NodeClient;
1717
import org.elasticsearch.common.Strings;
1818
import org.elasticsearch.common.util.set.Sets;
19+
import org.elasticsearch.core.TimeValue;
1920
import org.elasticsearch.rest.BaseRestHandler;
2021
import org.elasticsearch.rest.RestRequest;
2122
import org.elasticsearch.rest.action.RestCancellableNodeClient;
@@ -71,6 +72,12 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
7172
clusterInfoOnly,
7273
true
7374
);
75+
76+
String timeout = request.param("timeout");
77+
if (timeout != null) {
78+
resolveRequest.setTimeout(TimeValue.parseTimeValue(timeout, "timeout"));
79+
}
80+
7481
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
7582
.indices()
7683
.execute(TransportResolveClusterAction.TYPE, resolveRequest, new RestToXContentListener<>(channel));

0 commit comments

Comments
 (0)