Skip to content

Commit ef5e150

Browse files
committed
Expose Connection to remote clusters (elastic#113453)
Sometimes we might need to invoke different requests on a remote cluster depending on the version of the transport protocol it understands, but today we cannot make that distinction (without starting to execute an action on the remote cluster and failing while serializing the request at least). This commit allows callers access to the underlying `Transport.Connection` instance so that we can implement better BwC logic.
1 parent 774a97f commit ef5e150

File tree

8 files changed

+240
-79
lines changed

8 files changed

+240
-79
lines changed

server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.tasks.Task;
1919
import org.elasticsearch.tasks.TaskId;
2020
import org.elasticsearch.transport.RemoteClusterService;
21+
import org.elasticsearch.transport.Transport;
2122
import org.elasticsearch.transport.TransportResponse;
2223

2324
import java.util.concurrent.Executor;
@@ -75,12 +76,18 @@ public RemoteClusterClient getRemoteClusterClient(
7576
return new RemoteClusterClient() {
7677
@Override
7778
public <Request extends ActionRequest, Response extends TransportResponse> void execute(
79+
Transport.Connection connection,
7880
RemoteClusterActionType<Response> action,
7981
Request request,
8082
ActionListener<Response> listener
8183
) {
8284
request.setParentTask(parentTask);
83-
delegate.execute(action, request, listener);
85+
delegate.execute(connection, action, request, listener);
86+
}
87+
88+
@Override
89+
public <Request extends ActionRequest> void getConnection(Request request, ActionListener<Transport.Connection> listener) {
90+
delegate.getConnection(request, listener);
8491
}
8592
};
8693
}

server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.elasticsearch.action.ActionRequest;
1414
import org.elasticsearch.action.ActionType;
1515
import org.elasticsearch.action.RemoteClusterActionType;
16+
import org.elasticsearch.core.Nullable;
17+
import org.elasticsearch.transport.Transport;
1618
import org.elasticsearch.transport.TransportResponse;
1719

1820
/**
@@ -21,16 +23,31 @@
2123
public interface RemoteClusterClient {
2224
/**
2325
* Executes an action, denoted by an {@link ActionType}, on the remote cluster.
24-
*
25-
* @param action The action type to execute.
26-
* @param request The action request.
27-
* @param listener A listener for the response
28-
* @param <Request> The request type.
29-
* @param <Response> the response type.
26+
*/
27+
default <Request extends ActionRequest, Response extends TransportResponse> void execute(
28+
RemoteClusterActionType<Response> action,
29+
Request request,
30+
ActionListener<Response> listener
31+
) {
32+
getConnection(
33+
request,
34+
listener.delegateFailureAndWrap((responseListener, connection) -> execute(connection, action, request, responseListener))
35+
);
36+
}
37+
38+
/**
39+
* Executes an action, denoted by an {@link ActionType}, using a connection to the remote cluster obtained using {@link #getConnection}.
3040
*/
3141
<Request extends ActionRequest, Response extends TransportResponse> void execute(
42+
Transport.Connection connection,
3243
RemoteClusterActionType<Response> action,
3344
Request request,
3445
ActionListener<Response> listener
3546
);
47+
48+
/**
49+
* Obtain a connection to the remote cluster for use with the {@link #execute} override that allows to specify the connection. Useful
50+
* for cases where you need to inspect {@link Transport.Connection#getVersion} before deciding the exact remote action to invoke.
51+
*/
52+
<Request extends ActionRequest> void getConnection(@Nullable Request request, ActionListener<Transport.Connection> listener);
3653
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
import org.elasticsearch.action.ActionListenerResponseHandler;
1313
import org.elasticsearch.action.ActionRequest;
1414
import org.elasticsearch.action.RemoteClusterActionType;
15+
import org.elasticsearch.action.support.SubscribableListener;
1516
import org.elasticsearch.client.internal.RemoteClusterClient;
16-
import org.elasticsearch.cluster.node.DiscoveryNode;
17+
import org.elasticsearch.core.Nullable;
1718

1819
import java.util.concurrent.Executor;
1920

@@ -35,41 +36,48 @@ final class RemoteClusterAwareClient implements RemoteClusterClient {
3536

3637
@Override
3738
public <Request extends ActionRequest, Response extends TransportResponse> void execute(
39+
Transport.Connection connection,
3840
RemoteClusterActionType<Response> action,
3941
Request request,
4042
ActionListener<Response> listener
4143
) {
42-
maybeEnsureConnected(listener.delegateFailureAndWrap((delegateListener, v) -> {
43-
final Transport.Connection connection;
44-
try {
45-
if (request instanceof RemoteClusterAwareRequest) {
46-
DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode();
47-
connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias);
44+
service.sendRequest(
45+
connection,
46+
action.name(),
47+
request,
48+
TransportRequestOptions.EMPTY,
49+
new ActionListenerResponseHandler<>(listener, action.getResponseReader(), responseExecutor)
50+
);
51+
}
52+
53+
@Override
54+
public <Request extends ActionRequest> void getConnection(@Nullable Request request, ActionListener<Transport.Connection> listener) {
55+
SubscribableListener
56+
57+
.<Void>newForked(ensureConnectedListener -> {
58+
if (ensureConnected) {
59+
remoteClusterService.ensureConnected(clusterAlias, ensureConnectedListener);
4860
} else {
49-
connection = remoteClusterService.getConnection(clusterAlias);
61+
ensureConnectedListener.onResponse(null);
5062
}
51-
} catch (ConnectTransportException e) {
52-
if (ensureConnected == false) {
53-
// trigger another connection attempt, but don't wait for it to complete
54-
remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop());
63+
})
64+
65+
.andThenApply(ignored -> {
66+
try {
67+
if (request instanceof RemoteClusterAwareRequest remoteClusterAwareRequest) {
68+
return remoteClusterService.getConnection(remoteClusterAwareRequest.getPreferredTargetNode(), clusterAlias);
69+
} else {
70+
return remoteClusterService.getConnection(clusterAlias);
71+
}
72+
} catch (ConnectTransportException e) {
73+
if (ensureConnected == false) {
74+
// trigger another connection attempt, but don't wait for it to complete
75+
remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop());
76+
}
77+
throw e;
5578
}
56-
throw e;
57-
}
58-
service.sendRequest(
59-
connection,
60-
action.name(),
61-
request,
62-
TransportRequestOptions.EMPTY,
63-
new ActionListenerResponseHandler<>(delegateListener, action.getResponseReader(), responseExecutor)
64-
);
65-
}));
66-
}
79+
})
6780

68-
private void maybeEnsureConnected(ActionListener<Void> ensureConnectedListener) {
69-
if (ensureConnected) {
70-
ActionListener.run(ensureConnectedListener, l -> remoteClusterService.ensureConnected(clusterAlias, l));
71-
} else {
72-
ensureConnectedListener.onResponse(null);
73-
}
81+
.addListener(listener);
7482
}
7583
}

server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.test.ESTestCase;
2626
import org.elasticsearch.test.client.NoOpClient;
2727
import org.elasticsearch.transport.RemoteClusterService;
28+
import org.elasticsearch.transport.Transport;
2829
import org.elasticsearch.transport.TransportResponse;
2930

3031
import java.util.concurrent.Executor;
@@ -83,6 +84,24 @@ public <Request extends ActionRequest, Response extends TransportResponse> void
8384
assertSame(parentTaskId, request.getParentTask());
8485
listener.onFailure(new UnsupportedOperationException("fake remote-cluster client"));
8586
}
87+
88+
@Override
89+
public <Request extends ActionRequest, Response extends TransportResponse> void execute(
90+
Transport.Connection connection,
91+
RemoteClusterActionType<Response> action,
92+
Request request,
93+
ActionListener<Response> listener
94+
) {
95+
execute(action, request, listener);
96+
}
97+
98+
@Override
99+
public <Request extends ActionRequest> void getConnection(
100+
Request request,
101+
ActionListener<Transport.Connection> listener
102+
) {
103+
listener.onResponse(null);
104+
}
86105
};
87106
}
88107
};
@@ -107,6 +126,22 @@ public <Request extends ActionRequest, Response extends TransportResponse> void
107126
)
108127
).getMessage()
109128
);
129+
130+
assertEquals(
131+
"fake remote-cluster client",
132+
asInstanceOf(
133+
UnsupportedOperationException.class,
134+
safeAwaitFailure(
135+
ClusterStateResponse.class,
136+
listener -> remoteClusterClient.execute(
137+
null,
138+
ClusterStateAction.REMOTE_TYPE,
139+
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
140+
listener
141+
)
142+
)
143+
).getMessage()
144+
);
110145
}
111146
}
112147
}

server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java

Lines changed: 69 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.elasticsearch.action.search.SearchResponse;
1818
import org.elasticsearch.action.search.SearchScrollRequest;
1919
import org.elasticsearch.action.search.TransportSearchScrollAction;
20-
import org.elasticsearch.action.support.PlainActionFuture;
20+
import org.elasticsearch.action.support.SubscribableListener;
2121
import org.elasticsearch.cluster.ClusterName;
2222
import org.elasticsearch.cluster.node.DiscoveryNode;
2323
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -103,13 +103,25 @@ public void testConnectAndExecuteRequest() throws Exception {
103103
randomFrom(RemoteClusterService.DisconnectedStrategy.values())
104104
);
105105
ClusterStateResponse clusterStateResponse = safeAwait(
106-
listener -> client.execute(
107-
ClusterStateAction.REMOTE_TYPE,
108-
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
106+
listener -> ActionListener.run(
109107
ActionListener.runBefore(
110108
listener,
111109
() -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
112-
)
110+
),
111+
clusterStateResponseListener -> {
112+
final var request = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
113+
if (randomBoolean()) {
114+
client.execute(ClusterStateAction.REMOTE_TYPE, request, clusterStateResponseListener);
115+
} else {
116+
SubscribableListener.<Transport.Connection>newForked(
117+
l -> client.getConnection(randomFrom(request, null), l)
118+
)
119+
.<ClusterStateResponse>andThen(
120+
(l, connection) -> client.execute(connection, ClusterStateAction.REMOTE_TYPE, request, l)
121+
)
122+
.addListener(clusterStateResponseListener);
123+
}
124+
}
113125
)
114126
);
115127
assertNotNull(clusterStateResponse);
@@ -169,12 +181,13 @@ public void testEnsureWeReconnect() throws Exception {
169181
for (int i = 0; i < 10; i++) {
170182
RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test");
171183
assertBusy(remoteClusterConnection::assertNoRunningConnections);
172-
ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager();
173-
Transport.Connection connection = connectionManager.getConnection(remoteNode);
174-
PlainActionFuture<Void> closeFuture = new PlainActionFuture<>();
175-
connection.addCloseListener(closeFuture);
176-
connectionManager.disconnectFromNode(remoteNode);
177-
closeFuture.get();
184+
185+
safeAwait(connectionClosedListener -> {
186+
ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager();
187+
Transport.Connection connection = connectionManager.getConnection(remoteNode);
188+
connection.addCloseListener(connectionClosedListener.map(v -> v));
189+
connectionManager.disconnectFromNode(remoteNode);
190+
});
178191

179192
var client = remoteClusterService.getRemoteClusterClient(
180193
"test",
@@ -184,11 +197,21 @@ public void testEnsureWeReconnect() throws Exception {
184197
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
185198
)
186199
);
187-
ClusterStateResponse clusterStateResponse = safeAwait(
188-
listener -> client.execute(ClusterStateAction.REMOTE_TYPE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT), listener)
189-
);
190-
assertNotNull(clusterStateResponse);
191-
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
200+
201+
if (randomBoolean()) {
202+
final ClusterStateResponse clusterStateResponse = safeAwait(
203+
listener -> client.execute(
204+
ClusterStateAction.REMOTE_TYPE,
205+
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
206+
listener
207+
)
208+
);
209+
assertNotNull(clusterStateResponse);
210+
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
211+
} else {
212+
final Transport.Connection connection = safeAwait(listener -> client.getConnection(null, listener));
213+
assertFalse(connection.isClosed());
214+
}
192215
assertTrue(remoteClusterConnection.isNodeConnected(remoteNode));
193216
}
194217
}
@@ -271,28 +294,42 @@ public void testQuicklySkipUnavailableClusters() throws Exception {
271294
assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
272295

273296
// check that we quickly fail
274-
ESTestCase.assertThat(
275-
safeAwaitFailure(
276-
ClusterStateResponse.class,
277-
listener -> client.execute(
278-
ClusterStateAction.REMOTE_TYPE,
279-
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
280-
listener
281-
)
282-
),
283-
instanceOf(ConnectTransportException.class)
284-
);
297+
if (randomBoolean()) {
298+
ESTestCase.assertThat(
299+
safeAwaitFailure(
300+
ClusterStateResponse.class,
301+
listener -> client.execute(
302+
ClusterStateAction.REMOTE_TYPE,
303+
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
304+
listener
305+
)
306+
),
307+
instanceOf(ConnectTransportException.class)
308+
);
309+
} else {
310+
ESTestCase.assertThat(
311+
safeAwaitFailure(Transport.Connection.class, listener -> client.getConnection(null, listener)),
312+
instanceOf(ConnectTransportException.class)
313+
);
314+
}
285315
} finally {
286316
service.clearAllRules();
287317
latch.countDown();
288318
}
289319

290-
assertBusy(() -> {
291-
ClusterStateResponse ignored = safeAwait(
292-
listener -> client.execute(ClusterStateAction.REMOTE_TYPE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT), listener)
293-
);
320+
assertBusy(
294321
// keep retrying on an exception, the goal is to check that we eventually reconnect
295-
});
322+
randomFrom(
323+
() -> safeAwait(
324+
listener -> client.execute(
325+
ClusterStateAction.REMOTE_TYPE,
326+
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
327+
listener.map(v -> v)
328+
)
329+
),
330+
() -> safeAwait(listener -> client.getConnection(null, listener.map(v -> v)))
331+
)
332+
);
296333
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
297334
}
298335
}

0 commit comments

Comments
 (0)