Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;

import java.util.concurrent.Executor;
Expand Down Expand Up @@ -75,12 +76,18 @@ public RemoteClusterClient getRemoteClusterClient(
return new RemoteClusterClient() {
@Override
public <Request extends ActionRequest, Response extends TransportResponse> void execute(
Transport.Connection connection,
RemoteClusterActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
request.setParentTask(parentTask);
delegate.execute(action, request, listener);
delegate.execute(connection, action, request, listener);
}

@Override
public <Request extends ActionRequest> void getConnection(Request request, ActionListener<Transport.Connection> listener) {
delegate.getConnection(request, listener);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.RemoteClusterActionType;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;

/**
Expand All @@ -21,16 +23,31 @@
public interface RemoteClusterClient {
/**
* Executes an action, denoted by an {@link ActionType}, on the remote cluster.
*
* @param action The action type to execute.
* @param request The action request.
* @param listener A listener for the response
* @param <Request> The request type.
* @param <Response> the response type.
*/
default <Request extends ActionRequest, Response extends TransportResponse> void execute(
RemoteClusterActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
getConnection(
request,
listener.delegateFailureAndWrap((responseListener, connection) -> execute(connection, action, request, responseListener))
);
}

/**
* Executes an action, denoted by an {@link ActionType}, using a connection to the remote cluster obtained using {@link #getConnection}.
*/
<Request extends ActionRequest, Response extends TransportResponse> void execute(
Transport.Connection connection,
RemoteClusterActionType<Response> action,
Request request,
ActionListener<Response> listener
);

/**
* Obtain a connection to the remote cluster for use with the {@link #execute} override that allows to specify the connection. Useful
* for cases where you need to inspect {@link Transport.Connection#getVersion} before deciding the exact remote action to invoke.
*/
<Request extends ActionRequest> void getConnection(@Nullable Request request, ActionListener<Transport.Connection> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.RemoteClusterActionType;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Nullable;

import java.util.concurrent.Executor;

Expand All @@ -35,41 +36,48 @@ final class RemoteClusterAwareClient implements RemoteClusterClient {

@Override
public <Request extends ActionRequest, Response extends TransportResponse> void execute(
Transport.Connection connection,
RemoteClusterActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
maybeEnsureConnected(listener.delegateFailureAndWrap((delegateListener, v) -> {
final Transport.Connection connection;
try {
if (request instanceof RemoteClusterAwareRequest) {
DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode();
connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias);
service.sendRequest(
connection,
action.name(),
request,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, action.getResponseReader(), responseExecutor)
);
}

@Override
public <Request extends ActionRequest> void getConnection(@Nullable Request request, ActionListener<Transport.Connection> listener) {
SubscribableListener

.<Void>newForked(ensureConnectedListener -> {
if (ensureConnected) {
remoteClusterService.ensureConnected(clusterAlias, ensureConnectedListener);
} else {
connection = remoteClusterService.getConnection(clusterAlias);
ensureConnectedListener.onResponse(null);
}
} catch (ConnectTransportException e) {
if (ensureConnected == false) {
// trigger another connection attempt, but don't wait for it to complete
remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop());
})

.andThenApply(ignored -> {
try {
if (request instanceof RemoteClusterAwareRequest remoteClusterAwareRequest) {
return remoteClusterService.getConnection(remoteClusterAwareRequest.getPreferredTargetNode(), clusterAlias);
} else {
return remoteClusterService.getConnection(clusterAlias);
}
} catch (ConnectTransportException e) {
if (ensureConnected == false) {
// trigger another connection attempt, but don't wait for it to complete
remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop());
}
throw e;
}
throw e;
}
service.sendRequest(
connection,
action.name(),
request,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(delegateListener, action.getResponseReader(), responseExecutor)
);
}));
}
})

private void maybeEnsureConnected(ActionListener<Void> ensureConnectedListener) {
if (ensureConnected) {
ActionListener.run(ensureConnectedListener, l -> remoteClusterService.ensureConnected(clusterAlias, l));
} else {
ensureConnectedListener.onResponse(null);
}
.addListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;

import java.util.concurrent.Executor;
Expand Down Expand Up @@ -83,6 +84,24 @@ public <Request extends ActionRequest, Response extends TransportResponse> void
assertSame(parentTaskId, request.getParentTask());
listener.onFailure(new UnsupportedOperationException("fake remote-cluster client"));
}

@Override
public <Request extends ActionRequest, Response extends TransportResponse> void execute(
Transport.Connection connection,
RemoteClusterActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
execute(action, request, listener);
}

@Override
public <Request extends ActionRequest> void getConnection(
Request request,
ActionListener<Transport.Connection> listener
) {
listener.onResponse(null);
}
};
}
};
Expand All @@ -107,6 +126,22 @@ public <Request extends ActionRequest, Response extends TransportResponse> void
)
).getMessage()
);

assertEquals(
"fake remote-cluster client",
asInstanceOf(
UnsupportedOperationException.class,
safeAwaitFailure(
ClusterStateResponse.class,
listener -> remoteClusterClient.execute(
null,
ClusterStateAction.REMOTE_TYPE,
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
listener
)
)
).getMessage()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -103,13 +103,25 @@ public void testConnectAndExecuteRequest() throws Exception {
randomFrom(RemoteClusterService.DisconnectedStrategy.values())
);
ClusterStateResponse clusterStateResponse = safeAwait(
listener -> client.execute(
ClusterStateAction.REMOTE_TYPE,
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
listener -> ActionListener.run(
ActionListener.runBefore(
listener,
() -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
)
),
clusterStateResponseListener -> {
final var request = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
if (randomBoolean()) {
client.execute(ClusterStateAction.REMOTE_TYPE, request, clusterStateResponseListener);
} else {
SubscribableListener.<Transport.Connection>newForked(
l -> client.getConnection(randomFrom(request, null), l)
)
.<ClusterStateResponse>andThen(
(l, connection) -> client.execute(connection, ClusterStateAction.REMOTE_TYPE, request, l)
)
.addListener(clusterStateResponseListener);
}
}
)
);
assertNotNull(clusterStateResponse);
Expand Down Expand Up @@ -169,12 +181,13 @@ public void testEnsureWeReconnect() throws Exception {
for (int i = 0; i < 10; i++) {
RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test");
assertBusy(remoteClusterConnection::assertNoRunningConnections);
ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager();
Transport.Connection connection = connectionManager.getConnection(remoteNode);
PlainActionFuture<Void> closeFuture = new PlainActionFuture<>();
connection.addCloseListener(closeFuture);
connectionManager.disconnectFromNode(remoteNode);
closeFuture.get();

safeAwait(connectionClosedListener -> {
ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager();
Transport.Connection connection = connectionManager.getConnection(remoteNode);
connection.addCloseListener(connectionClosedListener.map(v -> v));
connectionManager.disconnectFromNode(remoteNode);
});

var client = remoteClusterService.getRemoteClusterClient(
"test",
Expand All @@ -184,11 +197,21 @@ public void testEnsureWeReconnect() throws Exception {
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
)
);
ClusterStateResponse clusterStateResponse = safeAwait(
listener -> client.execute(ClusterStateAction.REMOTE_TYPE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT), listener)
);
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());

if (randomBoolean()) {
final ClusterStateResponse clusterStateResponse = safeAwait(
listener -> client.execute(
ClusterStateAction.REMOTE_TYPE,
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
listener
)
);
assertNotNull(clusterStateResponse);
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
} else {
final Transport.Connection connection = safeAwait(listener -> client.getConnection(null, listener));
assertFalse(connection.isClosed());
}
assertTrue(remoteClusterConnection.isNodeConnected(remoteNode));
}
}
Expand Down Expand Up @@ -271,28 +294,42 @@ public void testQuicklySkipUnavailableClusters() throws Exception {
assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode));

// check that we quickly fail
ESTestCase.assertThat(
safeAwaitFailure(
ClusterStateResponse.class,
listener -> client.execute(
ClusterStateAction.REMOTE_TYPE,
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
listener
)
),
instanceOf(ConnectTransportException.class)
);
if (randomBoolean()) {
ESTestCase.assertThat(
safeAwaitFailure(
ClusterStateResponse.class,
listener -> client.execute(
ClusterStateAction.REMOTE_TYPE,
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
listener
)
),
instanceOf(ConnectTransportException.class)
);
} else {
ESTestCase.assertThat(
safeAwaitFailure(Transport.Connection.class, listener -> client.getConnection(null, listener)),
instanceOf(ConnectTransportException.class)
);
}
} finally {
service.clearAllRules();
latch.countDown();
}

assertBusy(() -> {
ClusterStateResponse ignored = safeAwait(
listener -> client.execute(ClusterStateAction.REMOTE_TYPE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT), listener)
);
assertBusy(
// keep retrying on an exception, the goal is to check that we eventually reconnect
});
randomFrom(
() -> safeAwait(
listener -> client.execute(
ClusterStateAction.REMOTE_TYPE,
new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
listener.map(v -> v)
)
),
() -> safeAwait(listener -> client.getConnection(null, listener.map(v -> v)))
)
);
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
}
}
Expand Down
Loading