diff --git a/server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java b/server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java index 8e90a459bcafd..014fef8bdfe56 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java @@ -18,6 +18,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import java.util.concurrent.Executor; @@ -75,12 +76,18 @@ public RemoteClusterClient getRemoteClusterClient( return new RemoteClusterClient() { @Override public void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener ) { request.setParentTask(parentTask); - delegate.execute(action, request, listener); + delegate.execute(connection, action, request, listener); + } + + @Override + public void getConnection(Request request, ActionListener listener) { + delegate.getConnection(request, listener); } }; } diff --git a/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java b/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java index 56ab07d26b4c7..9e3497601fb57 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java @@ -13,6 +13,8 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; /** @@ -21,16 +23,31 @@ public interface RemoteClusterClient { /** * Executes an action, denoted by an {@link ActionType}, on the remote cluster. - * - * @param action The action type to execute. - * @param request The action request. - * @param listener A listener for the response - * @param The request type. - * @param the response type. + */ + default void execute( + RemoteClusterActionType action, + Request request, + ActionListener listener + ) { + getConnection( + request, + listener.delegateFailureAndWrap((responseListener, connection) -> execute(connection, action, request, responseListener)) + ); + } + + /** + * Executes an action, denoted by an {@link ActionType}, using a connection to the remote cluster obtained using {@link #getConnection}. */ void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener ); + + /** + * Obtain a connection to the remote cluster for use with the {@link #execute} override that allows to specify the connection. Useful + * for cases where you need to inspect {@link Transport.Connection#getVersion} before deciding the exact remote action to invoke. + */ + void getConnection(@Nullable Request request, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 126393b688d5d..c21cd4dd2f714 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -12,8 +12,9 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.RemoteClusterClient; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.core.Nullable; import java.util.concurrent.Executor; @@ -35,41 +36,48 @@ final class RemoteClusterAwareClient implements RemoteClusterClient { @Override public void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener ) { - maybeEnsureConnected(listener.delegateFailureAndWrap((delegateListener, v) -> { - final Transport.Connection connection; - try { - if (request instanceof RemoteClusterAwareRequest) { - DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); - connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); + service.sendRequest( + connection, + action.name(), + request, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, action.getResponseReader(), responseExecutor) + ); + } + + @Override + public void getConnection(@Nullable Request request, ActionListener listener) { + SubscribableListener + + .newForked(ensureConnectedListener -> { + if (ensureConnected) { + remoteClusterService.ensureConnected(clusterAlias, ensureConnectedListener); } else { - connection = remoteClusterService.getConnection(clusterAlias); + ensureConnectedListener.onResponse(null); } - } catch (ConnectTransportException e) { - if (ensureConnected == false) { - // trigger another connection attempt, but don't wait for it to complete - remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop()); + }) + + .andThenApply(ignored -> { + try { + if (request instanceof RemoteClusterAwareRequest remoteClusterAwareRequest) { + return remoteClusterService.getConnection(remoteClusterAwareRequest.getPreferredTargetNode(), clusterAlias); + } else { + return remoteClusterService.getConnection(clusterAlias); + } + } catch (ConnectTransportException e) { + if (ensureConnected == false) { + // trigger another connection attempt, but don't wait for it to complete + remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop()); + } + throw e; } - throw e; - } - service.sendRequest( - connection, - action.name(), - request, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(delegateListener, action.getResponseReader(), responseExecutor) - ); - })); - } + }) - private void maybeEnsureConnected(ActionListener ensureConnectedListener) { - if (ensureConnected) { - ActionListener.run(ensureConnectedListener, l -> remoteClusterService.ensureConnected(clusterAlias, l)); - } else { - ensureConnectedListener.onResponse(null); - } + .addListener(listener); } } diff --git a/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java b/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java index 143858625ca76..b067509f4668c 100644 --- a/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java +++ b/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import java.util.concurrent.Executor; @@ -83,6 +84,24 @@ public void assertSame(parentTaskId, request.getParentTask()); listener.onFailure(new UnsupportedOperationException("fake remote-cluster client")); } + + @Override + public void execute( + Transport.Connection connection, + RemoteClusterActionType action, + Request request, + ActionListener listener + ) { + execute(action, request, listener); + } + + @Override + public void getConnection( + Request request, + ActionListener listener + ) { + listener.onResponse(null); + } }; } }; @@ -107,6 +126,22 @@ public void ) ).getMessage() ); + + assertEquals( + "fake remote-cluster client", + asInstanceOf( + UnsupportedOperationException.class, + safeAwaitFailure( + ClusterStateResponse.class, + listener -> remoteClusterClient.execute( + null, + ClusterStateAction.REMOTE_TYPE, + new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener + ) + ) + ).getMessage() + ); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 9ea75f060a30d..985fd6e10445d 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -17,7 +17,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.TransportSearchScrollAction; -import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -103,13 +103,25 @@ public void testConnectAndExecuteRequest() throws Exception { randomFrom(RemoteClusterService.DisconnectedStrategy.values()) ); ClusterStateResponse clusterStateResponse = safeAwait( - listener -> client.execute( - ClusterStateAction.REMOTE_TYPE, - new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener -> ActionListener.run( ActionListener.runBefore( listener, () -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']')) - ) + ), + clusterStateResponseListener -> { + final var request = new ClusterStateRequest(TEST_REQUEST_TIMEOUT); + if (randomBoolean()) { + client.execute(ClusterStateAction.REMOTE_TYPE, request, clusterStateResponseListener); + } else { + SubscribableListener.newForked( + l -> client.getConnection(randomFrom(request, null), l) + ) + .andThen( + (l, connection) -> client.execute(connection, ClusterStateAction.REMOTE_TYPE, request, l) + ) + .addListener(clusterStateResponseListener); + } + } ) ); assertNotNull(clusterStateResponse); @@ -169,12 +181,13 @@ public void testEnsureWeReconnect() throws Exception { for (int i = 0; i < 10; i++) { RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test"); assertBusy(remoteClusterConnection::assertNoRunningConnections); - ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager(); - Transport.Connection connection = connectionManager.getConnection(remoteNode); - PlainActionFuture closeFuture = new PlainActionFuture<>(); - connection.addCloseListener(closeFuture); - connectionManager.disconnectFromNode(remoteNode); - closeFuture.get(); + + safeAwait(connectionClosedListener -> { + ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager(); + Transport.Connection connection = connectionManager.getConnection(remoteNode); + connection.addCloseListener(connectionClosedListener.map(v -> v)); + connectionManager.disconnectFromNode(remoteNode); + }); var client = remoteClusterService.getRemoteClusterClient( "test", @@ -184,11 +197,21 @@ public void testEnsureWeReconnect() throws Exception { RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE ) ); - ClusterStateResponse clusterStateResponse = safeAwait( - listener -> client.execute(ClusterStateAction.REMOTE_TYPE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT), listener) - ); - assertNotNull(clusterStateResponse); - assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + + if (randomBoolean()) { + final ClusterStateResponse clusterStateResponse = safeAwait( + listener -> client.execute( + ClusterStateAction.REMOTE_TYPE, + new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener + ) + ); + assertNotNull(clusterStateResponse); + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + } else { + final Transport.Connection connection = safeAwait(listener -> client.getConnection(null, listener)); + assertFalse(connection.isClosed()); + } assertTrue(remoteClusterConnection.isNodeConnected(remoteNode)); } } @@ -271,28 +294,42 @@ public void testQuicklySkipUnavailableClusters() throws Exception { assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); // check that we quickly fail - ESTestCase.assertThat( - safeAwaitFailure( - ClusterStateResponse.class, - listener -> client.execute( - ClusterStateAction.REMOTE_TYPE, - new ClusterStateRequest(TEST_REQUEST_TIMEOUT), - listener - ) - ), - instanceOf(ConnectTransportException.class) - ); + if (randomBoolean()) { + ESTestCase.assertThat( + safeAwaitFailure( + ClusterStateResponse.class, + listener -> client.execute( + ClusterStateAction.REMOTE_TYPE, + new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener + ) + ), + instanceOf(ConnectTransportException.class) + ); + } else { + ESTestCase.assertThat( + safeAwaitFailure(Transport.Connection.class, listener -> client.getConnection(null, listener)), + instanceOf(ConnectTransportException.class) + ); + } } finally { service.clearAllRules(); latch.countDown(); } - assertBusy(() -> { - ClusterStateResponse ignored = safeAwait( - listener -> client.execute(ClusterStateAction.REMOTE_TYPE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT), listener) - ); + assertBusy( // keep retrying on an exception, the goal is to check that we eventually reconnect - }); + randomFrom( + () -> safeAwait( + listener -> client.execute( + ClusterStateAction.REMOTE_TYPE, + new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener.map(v -> v) + ) + ), + () -> safeAwait(listener -> client.getConnection(null, listener.map(v -> v))) + ) + ); assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/client/internal/RedirectToLocalClusterRemoteClusterClient.java b/test/framework/src/main/java/org/elasticsearch/client/internal/RedirectToLocalClusterRemoteClusterClient.java index cb2b2a78c1bd6..ff3910b2debfb 100644 --- a/test/framework/src/main/java/org/elasticsearch/client/internal/RedirectToLocalClusterRemoteClusterClient.java +++ b/test/framework/src/main/java/org/elasticsearch/client/internal/RedirectToLocalClusterRemoteClusterClient.java @@ -11,10 +11,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; /** @@ -22,10 +22,10 @@ */ public class RedirectToLocalClusterRemoteClusterClient implements RemoteClusterClient { - private final ElasticsearchClient delegate; + private final ElasticsearchClient localNodeClient; - public RedirectToLocalClusterRemoteClusterClient(ElasticsearchClient delegate) { - this.delegate = delegate; + public RedirectToLocalClusterRemoteClusterClient(ElasticsearchClient localNodeClient) { + this.localNodeClient = localNodeClient; } @SuppressWarnings("unchecked") @@ -35,6 +35,21 @@ public void Request request, ActionListener listener ) { - delegate.execute(new ActionType(action.name()), request, listener.map(r -> (Response) r)); + localNodeClient.execute(new ActionType<>(action.name()), request, listener.map(r -> (Response) r)); + } + + @Override + public void execute( + Transport.Connection connection, + RemoteClusterActionType action, + Request request, + ActionListener listener + ) { + throw new AssertionError("not implemented on RedirectToLocalClusterRemoteClusterClient"); + } + + @Override + public void getConnection(Request request, ActionListener listener) { + throw new AssertionError("not implemented on RedirectToLocalClusterRemoteClusterClient"); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index b4607e002f27e..2c633a43264f6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -43,6 +43,7 @@ import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; @@ -424,6 +425,7 @@ public static RemoteClusterClient wrapRemoteClusterClient( return new RemoteClusterClient() { @Override public void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener @@ -434,9 +436,14 @@ public void null, request, listener, - (r, l) -> client.execute(action, r, l) + (r, l) -> client.execute(connection, action, r, l) ); } + + @Override + public void getConnection(Request request, ActionListener listener) { + client.getConnection(request, listener); + } }; } } @@ -466,6 +473,7 @@ private static RemoteClusterClient systemClient(ThreadContext threadContext, Rem return new RemoteClusterClient() { @Override public void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener @@ -473,9 +481,14 @@ public void final Supplier supplier = threadContext.newRestorableContext(false); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { threadContext.markAsSystemContext(); - delegate.execute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + delegate.execute(connection, action, request, new ContextPreservingActionListener<>(supplier, listener)); } } + + @Override + public void getConnection(Request request, ActionListener listener) { + delegate.getConnection(request, listener); + } }; } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 1c38ed50ede39..3df47fb3bc066 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.test.MockLog.LoggingExpectation; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -358,13 +359,18 @@ public void testCreateNextCheckpointWithRemoteClient() throws InterruptedExcepti String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + doAnswer(withMockConnection()).when(remoteClient1).getConnection(any(), any()); + doAnswer(withMockConnection()).when(remoteClient2).getConnection(any(), any()); + doAnswer(withMockConnection()).when(remoteClient3).getConnection(any(), any()); + GetCheckpointAction.Response checkpointResponse = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 1L, 2L, 3L })); doAnswer(withResponse(checkpointResponse)).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); GetCheckpointAction.Response remoteCheckpointResponse = new GetCheckpointAction.Response( Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L }) ); - doAnswer(withResponse(remoteCheckpointResponse)).when(remoteClient1).execute(eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); + doAnswer(withRemoteResponse(remoteCheckpointResponse)).when(remoteClient1) + .execute(any(), eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class); @@ -401,18 +407,25 @@ public void testCreateNextCheckpointWithRemoteClients() throws InterruptedExcept String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + doAnswer(withMockConnection()).when(remoteClient1).getConnection(any(), any()); + doAnswer(withMockConnection()).when(remoteClient2).getConnection(any(), any()); + doAnswer(withMockConnection()).when(remoteClient3).getConnection(any(), any()); + GetCheckpointAction.Response remoteCheckpointResponse1 = new GetCheckpointAction.Response( Map.of("index-1", new long[] { 1L, 2L, 3L }) ); - doAnswer(withResponse(remoteCheckpointResponse1)).when(remoteClient1).execute(eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); + doAnswer(withRemoteResponse(remoteCheckpointResponse1)).when(remoteClient1) + .execute(any(), eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); GetCheckpointAction.Response remoteCheckpointResponse2 = new GetCheckpointAction.Response( Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L }) ); - doAnswer(withResponse(remoteCheckpointResponse2)).when(remoteClient2).execute(eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); + doAnswer(withRemoteResponse(remoteCheckpointResponse2)).when(remoteClient2) + .execute(any(), eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); GetCheckpointAction.Response remoteCheckpointResponse3 = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 9L })); - doAnswer(withResponse(remoteCheckpointResponse3)).when(remoteClient3).execute(eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); + doAnswer(withRemoteResponse(remoteCheckpointResponse3)).when(remoteClient3) + .execute(any(), eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class); @@ -483,4 +496,20 @@ private static Answer withResponse(Response response) { return null; }; } + + private static Answer withRemoteResponse(Response response) { + return invocationOnMock -> { + ActionListener listener = invocationOnMock.getArgument(3); + listener.onResponse(response); + return null; + }; + } + + private static Answer withMockConnection() { + return invocationOnMock -> { + ActionListener listener = invocationOnMock.getArgument(1); + listener.onResponse(mock(Transport.Connection.class)); + return null; + }; + } }