Skip to content

Commit 6da2eca

Browse files
Force reconnect to remote clusters with a short timeout for CPS
1 parent 0b06d9f commit 6da2eca

File tree

3 files changed

+160
-23
lines changed

3 files changed

+160
-23
lines changed
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.indices.cluster;
11+
12+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
13+
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
14+
import org.elasticsearch.common.settings.Setting;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.common.util.CollectionUtils;
17+
import org.elasticsearch.plugins.ClusterPlugin;
18+
import org.elasticsearch.plugins.Plugin;
19+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
20+
import org.elasticsearch.test.transport.MockTransportService;
21+
import org.elasticsearch.transport.TransportService;
22+
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.concurrent.CountDownLatch;
27+
28+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
29+
30+
public class RemoteFieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase {
31+
private static final String REMOTE_CLUSTER_1 = "cluster-a";
32+
33+
public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin {
34+
@Override
35+
public List<Setting<?>> getSettings() {
36+
return List.of(ForceConnectTimeoutSetting);
37+
}
38+
}
39+
40+
private static final Setting<String> ForceConnectTimeoutSetting = Setting.simpleString(
41+
"search.ccs.force_connect_timeout",
42+
Setting.Property.NodeScope
43+
);
44+
45+
@Override
46+
protected List<String> remoteClusterAlias() {
47+
return List.of(REMOTE_CLUSTER_1);
48+
}
49+
50+
@Override
51+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
52+
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class);
53+
}
54+
55+
@Override
56+
protected Settings nodeSettings() {
57+
/*
58+
* This is the setting that controls how long TransportFieldCapabilitiesAction will wait for establishing a connection
59+
* with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent
60+
* with what we've done in other tests.
61+
*/
62+
return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build();
63+
}
64+
65+
@Override
66+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
67+
return Map.of(REMOTE_CLUSTER_1, true);
68+
}
69+
70+
public void testTimeoutSetting() {
71+
var latch = new CountDownLatch(1);
72+
for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) {
73+
MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName);
74+
75+
mts.addConnectBehavior(
76+
cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, randomFrom(cluster(REMOTE_CLUSTER_1).getNodeNames())),
77+
((transport, discoveryNode, profile, listener) -> {
78+
try {
79+
latch.await();
80+
} catch (InterruptedException e) {
81+
throw new AssertionError(e);
82+
}
83+
84+
transport.openConnection(discoveryNode, profile, listener);
85+
})
86+
);
87+
}
88+
89+
// Add some dummy data to prove we are communicating fine with the remote.
90+
assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index"));
91+
client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
92+
client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();
93+
94+
/*
95+
* Do a full restart so that our custom connect behaviour takes effect since it does not apply to
96+
* pre-existing connections -- they're already established by the time this test runs.
97+
*/
98+
try {
99+
cluster(REMOTE_CLUSTER_1).fullRestart();
100+
} catch (Exception e) {
101+
throw new AssertionError(e);
102+
} finally {
103+
var fieldCapsRequest = new FieldCapabilitiesRequest();
104+
fieldCapsRequest.indices("*", "*:*");
105+
fieldCapsRequest.fields("foo", "bar", "baz");
106+
var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest));
107+
108+
latch.countDown();
109+
result.decRef();
110+
}
111+
}
112+
}

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.util.automaton.TooComplexToDeterminizeException;
1414
import org.elasticsearch.ExceptionsHelper;
1515
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.action.ActionListenerResponseHandler;
1617
import org.elasticsearch.action.ActionRunnable;
1718
import org.elasticsearch.action.ActionType;
1819
import org.elasticsearch.action.OriginalIndices;
@@ -22,7 +23,7 @@
2223
import org.elasticsearch.action.support.ChannelActionListener;
2324
import org.elasticsearch.action.support.HandledTransportAction;
2425
import org.elasticsearch.action.support.RefCountingRunnable;
25-
import org.elasticsearch.client.internal.RemoteClusterClient;
26+
import org.elasticsearch.action.support.SubscribableListener;
2627
import org.elasticsearch.cluster.ProjectState;
2728
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2829
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -37,6 +38,7 @@
3738
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
3839
import org.elasticsearch.core.Nullable;
3940
import org.elasticsearch.core.Releasable;
41+
import org.elasticsearch.core.TimeValue;
4042
import org.elasticsearch.core.Tuple;
4143
import org.elasticsearch.index.shard.ShardId;
4244
import org.elasticsearch.indices.IndicesService;
@@ -48,9 +50,10 @@
4850
import org.elasticsearch.tasks.Task;
4951
import org.elasticsearch.threadpool.ThreadPool;
5052
import org.elasticsearch.transport.RemoteClusterAware;
51-
import org.elasticsearch.transport.RemoteClusterService;
53+
import org.elasticsearch.transport.Transport;
5254
import org.elasticsearch.transport.TransportChannel;
5355
import org.elasticsearch.transport.TransportRequestHandler;
56+
import org.elasticsearch.transport.TransportRequestOptions;
5457
import org.elasticsearch.transport.TransportService;
5558

5659
import java.util.ArrayList;
@@ -91,6 +94,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
9194

9295
private final IndicesService indicesService;
9396
private final boolean ccsCheckCompatibility;
97+
private final ThreadPool threadPool;
98+
private final TimeValue forceConnectTimeoutSecs;
9499

95100
@Inject
96101
public TransportFieldCapabilitiesAction(
@@ -117,14 +122,22 @@ public TransportFieldCapabilitiesAction(
117122
new NodeTransportHandler()
118123
);
119124
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
125+
this.threadPool = threadPool;
126+
this.forceConnectTimeoutSecs = clusterService.getSettings().getAsTime("search.ccs.force_connect_timeout", null);
120127
}
121128

122129
@Override
123130
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
124131
executeRequest(
125132
task,
126133
request,
127-
(remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener),
134+
(transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest(
135+
conn,
136+
REMOTE_TYPE.name(),
137+
fieldCapabilitiesRequest,
138+
TransportRequestOptions.EMPTY,
139+
responseHandler
140+
),
128141
listener
129142
);
130143
}
@@ -268,12 +281,6 @@ private void doExecuteForked(
268281
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
269282
String clusterAlias = remoteIndices.getKey();
270283
OriginalIndices originalIndices = remoteIndices.getValue();
271-
var remoteClusterClient = transportService.getRemoteClusterService()
272-
.getRemoteClusterClient(
273-
clusterAlias,
274-
singleThreadedExecutor,
275-
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
276-
);
277284
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
278285
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
279286
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
@@ -299,18 +306,34 @@ private void doExecuteForked(
299306
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
300307
}
301308
});
302-
remoteRequestExecutor.executeRemoteRequest(
303-
remoteClusterClient,
304-
remoteRequest,
309+
310+
SubscribableListener<Transport.Connection> connectionListener = new SubscribableListener<>();
311+
if (forceConnectTimeoutSecs != null) {
312+
connectionListener.addTimeout(forceConnectTimeoutSecs, threadPool, singleThreadedExecutor);
313+
}
314+
315+
connectionListener.addListener(
305316
// The underlying transport service may call onFailure with a thread pool other than search_coordinator.
306317
// This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
307318
// TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439
308319
new ForkingOnFailureActionListener<>(
309320
singleThreadedExecutor,
310321
true,
311322
ActionListener.releaseAfter(remoteListener, refs.acquire())
323+
).delegateFailure(
324+
(responseListener, conn) -> remoteRequestExecutor.executeRemoteRequest(
325+
transportService,
326+
conn,
327+
remoteRequest,
328+
new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor)
329+
)
312330
)
313331
);
332+
333+
boolean ensureConnected = forceConnectTimeoutSecs != null
334+
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
335+
transportService.getRemoteClusterService()
336+
.maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener);
314337
}
315338
}
316339
}
@@ -340,9 +363,10 @@ public void onFailure(Exception e) {
340363

341364
public interface RemoteRequestExecutor {
342365
void executeRemoteRequest(
343-
RemoteClusterClient remoteClient,
366+
TransportService transportService,
367+
Transport.Connection conn,
344368
FieldCapabilitiesRequest remoteRequest,
345-
ActionListener<FieldCapabilitiesResponse> remoteListener
369+
ActionListenerResponseHandler<FieldCapabilitiesResponse> responseHandler
346370
);
347371
}
348372

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,19 @@
88

99
import org.elasticsearch.TransportVersions;
1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.ActionListenerResponseHandler;
1112
import org.elasticsearch.action.ActionType;
1213
import org.elasticsearch.action.RemoteClusterActionType;
1314
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
1415
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
1516
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
1617
import org.elasticsearch.action.support.ActionFilters;
1718
import org.elasticsearch.action.support.HandledTransportAction;
18-
import org.elasticsearch.client.internal.RemoteClusterClient;
1919
import org.elasticsearch.common.util.concurrent.EsExecutors;
2020
import org.elasticsearch.injection.guice.Inject;
2121
import org.elasticsearch.tasks.Task;
22+
import org.elasticsearch.transport.Transport;
23+
import org.elasticsearch.transport.TransportRequestOptions;
2224
import org.elasticsearch.transport.TransportService;
2325

2426
/**
@@ -53,15 +55,14 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
5355
}
5456

5557
void executeRemoteRequest(
56-
RemoteClusterClient remoteClient,
58+
TransportService transportService,
59+
Transport.Connection conn,
5760
FieldCapabilitiesRequest remoteRequest,
58-
ActionListener<FieldCapabilitiesResponse> remoteListener
61+
ActionListenerResponseHandler<FieldCapabilitiesResponse> responseHandler
5962
) {
60-
remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> {
61-
var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)
62-
? RESOLVE_REMOTE_TYPE
63-
: TransportFieldCapabilitiesAction.REMOTE_TYPE;
64-
remoteClient.execute(conn, remoteAction, remoteRequest, l);
65-
}));
63+
var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)
64+
? RESOLVE_REMOTE_TYPE
65+
: TransportFieldCapabilitiesAction.REMOTE_TYPE;
66+
transportService.sendRequest(conn, remoteAction.name(), remoteRequest, TransportRequestOptions.EMPTY, responseHandler);
6667
}
6768
}

0 commit comments

Comments
 (0)