From 8f6cf0a131997a3c56813437ceeec358a9ab6b94 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 24 Sep 2024 12:48:05 +0100 Subject: [PATCH 1/4] Expose `Connection` to remote clusters Sometimes we might need to invoke different requests on a remote cluster depending on the version of the transport protocol it understands, but today we cannot make that distinction (without starting to execute an action on the remote cluster and failing while serializing the request at least). This commit allows callers access to the underlying `Transport.Connection` instance so that we can implement better BwC logic. --- .../internal/ParentTaskAssigningClient.java | 9 +- .../client/internal/RemoteClusterClient.java | 29 +++-- .../transport/RemoteClusterAwareClient.java | 66 +++++++----- .../ParentTaskAssigningClientTests.java | 35 ++++++ .../transport/RemoteClusterClientTests.java | 101 ++++++++++++------ ...rectToLocalClusterRemoteClusterClient.java | 25 ++++- .../xpack/ccr/CcrLicenseChecker.java | 17 ++- 7 files changed, 207 insertions(+), 75 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java b/server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java index 8e90a459bcafd..014fef8bdfe56 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/ParentTaskAssigningClient.java @@ -18,6 +18,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import java.util.concurrent.Executor; @@ -75,12 +76,18 @@ public RemoteClusterClient getRemoteClusterClient( return new RemoteClusterClient() { @Override public void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener ) { request.setParentTask(parentTask); - delegate.execute(action, request, listener); + delegate.execute(connection, action, request, listener); + } + + @Override + public void getConnection(Request request, ActionListener listener) { + delegate.getConnection(request, listener); } }; } diff --git a/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java b/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java index 56ab07d26b4c7..eb56e523994fe 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java @@ -13,6 +13,9 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; /** @@ -21,16 +24,30 @@ public interface RemoteClusterClient { /** * Executes an action, denoted by an {@link ActionType}, on the remote cluster. - * - * @param action The action type to execute. - * @param request The action request. - * @param listener A listener for the response - * @param The request type. - * @param the response type. + */ + default void execute( + RemoteClusterActionType action, + Request request, + ActionListener listener + ) { + SubscribableListener.newForked(connectionListener -> getConnection(request, connectionListener)) + .andThen((responseListener, connection) -> execute(connection, action, request, responseListener)) + .addListener(listener); + } + + /** + * Executes an action, denoted by an {@link ActionType}, using a connection to the remote cluster obtained using {@link #getConnection}. */ void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener ); + + /** + * Obtain a connection to the remote cluster for use with the {@link #execute} override that allows to specify the connection. Useful + * for cases where you need to inspect {@link Transport.Connection#getVersion} before deciding the exact remote action to invoke. + */ + void getConnection(@Nullable Request request, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 126393b688d5d..c21cd4dd2f714 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -12,8 +12,9 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.RemoteClusterClient; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.core.Nullable; import java.util.concurrent.Executor; @@ -35,41 +36,48 @@ final class RemoteClusterAwareClient implements RemoteClusterClient { @Override public void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener ) { - maybeEnsureConnected(listener.delegateFailureAndWrap((delegateListener, v) -> { - final Transport.Connection connection; - try { - if (request instanceof RemoteClusterAwareRequest) { - DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); - connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); + service.sendRequest( + connection, + action.name(), + request, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, action.getResponseReader(), responseExecutor) + ); + } + + @Override + public void getConnection(@Nullable Request request, ActionListener listener) { + SubscribableListener + + .newForked(ensureConnectedListener -> { + if (ensureConnected) { + remoteClusterService.ensureConnected(clusterAlias, ensureConnectedListener); } else { - connection = remoteClusterService.getConnection(clusterAlias); + ensureConnectedListener.onResponse(null); } - } catch (ConnectTransportException e) { - if (ensureConnected == false) { - // trigger another connection attempt, but don't wait for it to complete - remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop()); + }) + + .andThenApply(ignored -> { + try { + if (request instanceof RemoteClusterAwareRequest remoteClusterAwareRequest) { + return remoteClusterService.getConnection(remoteClusterAwareRequest.getPreferredTargetNode(), clusterAlias); + } else { + return remoteClusterService.getConnection(clusterAlias); + } + } catch (ConnectTransportException e) { + if (ensureConnected == false) { + // trigger another connection attempt, but don't wait for it to complete + remoteClusterService.ensureConnected(clusterAlias, ActionListener.noop()); + } + throw e; } - throw e; - } - service.sendRequest( - connection, - action.name(), - request, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(delegateListener, action.getResponseReader(), responseExecutor) - ); - })); - } + }) - private void maybeEnsureConnected(ActionListener ensureConnectedListener) { - if (ensureConnected) { - ActionListener.run(ensureConnectedListener, l -> remoteClusterService.ensureConnected(clusterAlias, l)); - } else { - ensureConnectedListener.onResponse(null); - } + .addListener(listener); } } diff --git a/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java b/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java index 143858625ca76..daeafdca497b6 100644 --- a/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java +++ b/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import java.util.concurrent.Executor; @@ -83,6 +84,24 @@ public void assertSame(parentTaskId, request.getParentTask()); listener.onFailure(new UnsupportedOperationException("fake remote-cluster client")); } + + @Override + public void execute( + Transport.Connection connection, + RemoteClusterActionType action, + Request request, + ActionListener listener + ) { + execute(action, request, listener); + } + + @Override + public void getConnection( + Request request, + ActionListener listener + ) { + fail("should not be called"); + } }; } }; @@ -107,6 +126,22 @@ public void ) ).getMessage() ); + + assertEquals( + "fake remote-cluster client", + asInstanceOf( + UnsupportedOperationException.class, + safeAwaitFailure( + ClusterStateResponse.class, + listener -> remoteClusterClient.execute( + null, + ClusterStateAction.REMOTE_TYPE, + new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener + ) + ) + ).getMessage() + ); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 9ea75f060a30d..985fd6e10445d 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -17,7 +17,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.TransportSearchScrollAction; -import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -103,13 +103,25 @@ public void testConnectAndExecuteRequest() throws Exception { randomFrom(RemoteClusterService.DisconnectedStrategy.values()) ); ClusterStateResponse clusterStateResponse = safeAwait( - listener -> client.execute( - ClusterStateAction.REMOTE_TYPE, - new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener -> ActionListener.run( ActionListener.runBefore( listener, () -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']')) - ) + ), + clusterStateResponseListener -> { + final var request = new ClusterStateRequest(TEST_REQUEST_TIMEOUT); + if (randomBoolean()) { + client.execute(ClusterStateAction.REMOTE_TYPE, request, clusterStateResponseListener); + } else { + SubscribableListener.newForked( + l -> client.getConnection(randomFrom(request, null), l) + ) + .andThen( + (l, connection) -> client.execute(connection, ClusterStateAction.REMOTE_TYPE, request, l) + ) + .addListener(clusterStateResponseListener); + } + } ) ); assertNotNull(clusterStateResponse); @@ -169,12 +181,13 @@ public void testEnsureWeReconnect() throws Exception { for (int i = 0; i < 10; i++) { RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test"); assertBusy(remoteClusterConnection::assertNoRunningConnections); - ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager(); - Transport.Connection connection = connectionManager.getConnection(remoteNode); - PlainActionFuture closeFuture = new PlainActionFuture<>(); - connection.addCloseListener(closeFuture); - connectionManager.disconnectFromNode(remoteNode); - closeFuture.get(); + + safeAwait(connectionClosedListener -> { + ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager(); + Transport.Connection connection = connectionManager.getConnection(remoteNode); + connection.addCloseListener(connectionClosedListener.map(v -> v)); + connectionManager.disconnectFromNode(remoteNode); + }); var client = remoteClusterService.getRemoteClusterClient( "test", @@ -184,11 +197,21 @@ public void testEnsureWeReconnect() throws Exception { RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE ) ); - ClusterStateResponse clusterStateResponse = safeAwait( - listener -> client.execute(ClusterStateAction.REMOTE_TYPE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT), listener) - ); - assertNotNull(clusterStateResponse); - assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + + if (randomBoolean()) { + final ClusterStateResponse clusterStateResponse = safeAwait( + listener -> client.execute( + ClusterStateAction.REMOTE_TYPE, + new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener + ) + ); + assertNotNull(clusterStateResponse); + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + } else { + final Transport.Connection connection = safeAwait(listener -> client.getConnection(null, listener)); + assertFalse(connection.isClosed()); + } assertTrue(remoteClusterConnection.isNodeConnected(remoteNode)); } } @@ -271,28 +294,42 @@ public void testQuicklySkipUnavailableClusters() throws Exception { assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); // check that we quickly fail - ESTestCase.assertThat( - safeAwaitFailure( - ClusterStateResponse.class, - listener -> client.execute( - ClusterStateAction.REMOTE_TYPE, - new ClusterStateRequest(TEST_REQUEST_TIMEOUT), - listener - ) - ), - instanceOf(ConnectTransportException.class) - ); + if (randomBoolean()) { + ESTestCase.assertThat( + safeAwaitFailure( + ClusterStateResponse.class, + listener -> client.execute( + ClusterStateAction.REMOTE_TYPE, + new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener + ) + ), + instanceOf(ConnectTransportException.class) + ); + } else { + ESTestCase.assertThat( + safeAwaitFailure(Transport.Connection.class, listener -> client.getConnection(null, listener)), + instanceOf(ConnectTransportException.class) + ); + } } finally { service.clearAllRules(); latch.countDown(); } - assertBusy(() -> { - ClusterStateResponse ignored = safeAwait( - listener -> client.execute(ClusterStateAction.REMOTE_TYPE, new ClusterStateRequest(TEST_REQUEST_TIMEOUT), listener) - ); + assertBusy( // keep retrying on an exception, the goal is to check that we eventually reconnect - }); + randomFrom( + () -> safeAwait( + listener -> client.execute( + ClusterStateAction.REMOTE_TYPE, + new ClusterStateRequest(TEST_REQUEST_TIMEOUT), + listener.map(v -> v) + ) + ), + () -> safeAwait(listener -> client.getConnection(null, listener.map(v -> v))) + ) + ); assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/client/internal/RedirectToLocalClusterRemoteClusterClient.java b/test/framework/src/main/java/org/elasticsearch/client/internal/RedirectToLocalClusterRemoteClusterClient.java index cb2b2a78c1bd6..ff3910b2debfb 100644 --- a/test/framework/src/main/java/org/elasticsearch/client/internal/RedirectToLocalClusterRemoteClusterClient.java +++ b/test/framework/src/main/java/org/elasticsearch/client/internal/RedirectToLocalClusterRemoteClusterClient.java @@ -11,10 +11,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; /** @@ -22,10 +22,10 @@ */ public class RedirectToLocalClusterRemoteClusterClient implements RemoteClusterClient { - private final ElasticsearchClient delegate; + private final ElasticsearchClient localNodeClient; - public RedirectToLocalClusterRemoteClusterClient(ElasticsearchClient delegate) { - this.delegate = delegate; + public RedirectToLocalClusterRemoteClusterClient(ElasticsearchClient localNodeClient) { + this.localNodeClient = localNodeClient; } @SuppressWarnings("unchecked") @@ -35,6 +35,21 @@ public void Request request, ActionListener listener ) { - delegate.execute(new ActionType(action.name()), request, listener.map(r -> (Response) r)); + localNodeClient.execute(new ActionType<>(action.name()), request, listener.map(r -> (Response) r)); + } + + @Override + public void execute( + Transport.Connection connection, + RemoteClusterActionType action, + Request request, + ActionListener listener + ) { + throw new AssertionError("not implemented on RedirectToLocalClusterRemoteClusterClient"); + } + + @Override + public void getConnection(Request request, ActionListener listener) { + throw new AssertionError("not implemented on RedirectToLocalClusterRemoteClusterClient"); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index b4607e002f27e..2c633a43264f6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -43,6 +43,7 @@ import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; @@ -424,6 +425,7 @@ public static RemoteClusterClient wrapRemoteClusterClient( return new RemoteClusterClient() { @Override public void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener @@ -434,9 +436,14 @@ public void null, request, listener, - (r, l) -> client.execute(action, r, l) + (r, l) -> client.execute(connection, action, r, l) ); } + + @Override + public void getConnection(Request request, ActionListener listener) { + client.getConnection(request, listener); + } }; } } @@ -466,6 +473,7 @@ private static RemoteClusterClient systemClient(ThreadContext threadContext, Rem return new RemoteClusterClient() { @Override public void execute( + Transport.Connection connection, RemoteClusterActionType action, Request request, ActionListener listener @@ -473,9 +481,14 @@ public void final Supplier supplier = threadContext.newRestorableContext(false); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { threadContext.markAsSystemContext(); - delegate.execute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + delegate.execute(connection, action, request, new ContextPreservingActionListener<>(supplier, listener)); } } + + @Override + public void getConnection(Request request, ActionListener listener) { + delegate.getConnection(request, listener); + } }; } From 60290581eeb28165510f429288c198a78cc16190 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 24 Sep 2024 16:08:15 +0100 Subject: [PATCH 2/4] Return dummy (null) connection --- .../client/internal/ParentTaskAssigningClientTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java b/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java index daeafdca497b6..b067509f4668c 100644 --- a/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java +++ b/server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java @@ -100,7 +100,7 @@ public void getConnection( Request request, ActionListener listener ) { - fail("should not be called"); + listener.onResponse(null); } }; } From 84dcff58f0798beb529aca17daa723009fddf46d Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 24 Sep 2024 17:09:41 +0100 Subject: [PATCH 3/4] Tighten up threading --- .../client/internal/RemoteClusterClient.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java b/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java index eb56e523994fe..9e3497601fb57 100644 --- a/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java +++ b/server/src/main/java/org/elasticsearch/client/internal/RemoteClusterClient.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; -import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.core.Nullable; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; @@ -30,9 +29,10 @@ default void Request request, ActionListener listener ) { - SubscribableListener.newForked(connectionListener -> getConnection(request, connectionListener)) - .andThen((responseListener, connection) -> execute(connection, action, request, responseListener)) - .addListener(listener); + getConnection( + request, + listener.delegateFailureAndWrap((responseListener, connection) -> execute(connection, action, request, responseListener)) + ); } /** From 0e6adfc8022e50c8cab0f2db75f3f5b29ab934a3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 24 Sep 2024 18:16:05 +0100 Subject: [PATCH 4/4] Mox --- .../DefaultCheckpointProviderTests.java | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 1c38ed50ede39..3df47fb3bc066 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.test.MockLog.LoggingExpectation; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.Transport; import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -358,13 +359,18 @@ public void testCreateNextCheckpointWithRemoteClient() throws InterruptedExcepti String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + doAnswer(withMockConnection()).when(remoteClient1).getConnection(any(), any()); + doAnswer(withMockConnection()).when(remoteClient2).getConnection(any(), any()); + doAnswer(withMockConnection()).when(remoteClient3).getConnection(any(), any()); + GetCheckpointAction.Response checkpointResponse = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 1L, 2L, 3L })); doAnswer(withResponse(checkpointResponse)).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any()); GetCheckpointAction.Response remoteCheckpointResponse = new GetCheckpointAction.Response( Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L }) ); - doAnswer(withResponse(remoteCheckpointResponse)).when(remoteClient1).execute(eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); + doAnswer(withRemoteResponse(remoteCheckpointResponse)).when(remoteClient1) + .execute(any(), eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class); @@ -401,18 +407,25 @@ public void testCreateNextCheckpointWithRemoteClients() throws InterruptedExcept String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + doAnswer(withMockConnection()).when(remoteClient1).getConnection(any(), any()); + doAnswer(withMockConnection()).when(remoteClient2).getConnection(any(), any()); + doAnswer(withMockConnection()).when(remoteClient3).getConnection(any(), any()); + GetCheckpointAction.Response remoteCheckpointResponse1 = new GetCheckpointAction.Response( Map.of("index-1", new long[] { 1L, 2L, 3L }) ); - doAnswer(withResponse(remoteCheckpointResponse1)).when(remoteClient1).execute(eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); + doAnswer(withRemoteResponse(remoteCheckpointResponse1)).when(remoteClient1) + .execute(any(), eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); GetCheckpointAction.Response remoteCheckpointResponse2 = new GetCheckpointAction.Response( Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L }) ); - doAnswer(withResponse(remoteCheckpointResponse2)).when(remoteClient2).execute(eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); + doAnswer(withRemoteResponse(remoteCheckpointResponse2)).when(remoteClient2) + .execute(any(), eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); GetCheckpointAction.Response remoteCheckpointResponse3 = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 9L })); - doAnswer(withResponse(remoteCheckpointResponse3)).when(remoteClient3).execute(eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); + doAnswer(withRemoteResponse(remoteCheckpointResponse3)).when(remoteClient3) + .execute(any(), eq(GetCheckpointAction.REMOTE_TYPE), any(), any()); RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class); @@ -483,4 +496,20 @@ private static Answer withResponse(Response response) { return null; }; } + + private static Answer withRemoteResponse(Response response) { + return invocationOnMock -> { + ActionListener listener = invocationOnMock.getArgument(3); + listener.onResponse(response); + return null; + }; + } + + private static Answer withMockConnection() { + return invocationOnMock -> { + ActionListener listener = invocationOnMock.getArgument(1); + listener.onResponse(mock(Transport.Connection.class)); + return null; + }; + } }