diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 51907b4da0f96..f4c402bab2f22 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.IOUtils; @@ -40,6 +41,8 @@ import java.util.Locale; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -248,10 +251,7 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th return bulkClient.performRequest(request); } else { Request[] clones = cloneRequests(request, 2); - Response resp1 = remoteClient.performRequest(clones[0]); - Response resp2 = localClient.performRequest(clones[1]); - assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); - return resp2; + return runInParallel(localClient, remoteClient, clones); } }); doAnswer(invocation -> { @@ -286,6 +286,44 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException { return clones; } + /** + * Run {@link #cloneRequests cloned} requests in parallel. + */ + static Response runInParallel(RestClient localClient, RestClient remoteClient, Request[] clones) throws Throwable { + CompletableFuture remoteResponse = new CompletableFuture<>(); + CompletableFuture localResponse = new CompletableFuture<>(); + remoteClient.performRequestAsync(clones[0], new ResponseListener() { + @Override + public void onSuccess(Response response) { + remoteResponse.complete(response); + } + + @Override + public void onFailure(Exception exception) { + remoteResponse.completeExceptionally(exception); + } + }); + localClient.performRequestAsync(clones[1], new ResponseListener() { + @Override + public void onSuccess(Response response) { + localResponse.complete(response); + } + + @Override + public void onFailure(Exception exception) { + localResponse.completeExceptionally(exception); + } + }); + try { + Response remote = remoteResponse.get(); + Response local = localResponse.get(); + assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode()); + return local; + } catch (ExecutionException e) { + throw e.getCause(); + } + } + /** * Convert FROM employees ... => FROM *:employees,employees */