|
15 | 15 | import org.elasticsearch.Version; |
16 | 16 | import org.elasticsearch.client.Request; |
17 | 17 | import org.elasticsearch.client.Response; |
| 18 | +import org.elasticsearch.client.ResponseListener; |
18 | 19 | import org.elasticsearch.client.RestClient; |
19 | 20 | import org.elasticsearch.common.settings.Settings; |
20 | 21 | import org.elasticsearch.core.IOUtils; |
|
38 | 39 | import java.util.List; |
39 | 40 | import java.util.Locale; |
40 | 41 | import java.util.Set; |
| 42 | +import java.util.concurrent.CompletableFuture; |
| 43 | +import java.util.concurrent.ExecutionException; |
41 | 44 | import java.util.regex.Pattern; |
42 | 45 | import java.util.stream.Collectors; |
43 | 46 |
|
@@ -251,10 +254,7 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th |
251 | 254 | return bulkClient.performRequest(request); |
252 | 255 | } else { |
253 | 256 | Request[] clones = cloneRequests(request, 2); |
254 | | - Response resp1 = remoteClient.performRequest(clones[0]); |
255 | | - Response resp2 = localClient.performRequest(clones[1]); |
256 | | - assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); |
257 | | - return resp2; |
| 257 | + return runInParallel(localClient, remoteClient, clones); |
258 | 258 | } |
259 | 259 | }); |
260 | 260 | doAnswer(invocation -> { |
@@ -289,6 +289,44 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException { |
289 | 289 | return clones; |
290 | 290 | } |
291 | 291 |
|
| 292 | + /** |
| 293 | + * Run {@link #cloneRequests cloned} requests in parallel. |
| 294 | + */ |
| 295 | + static Response runInParallel(RestClient localClient, RestClient remoteClient, Request[] clones) throws Throwable { |
| 296 | + CompletableFuture<Response> remoteResponse = new CompletableFuture<>(); |
| 297 | + CompletableFuture<Response> localResponse = new CompletableFuture<>(); |
| 298 | + remoteClient.performRequestAsync(clones[0], new ResponseListener() { |
| 299 | + @Override |
| 300 | + public void onSuccess(Response response) { |
| 301 | + remoteResponse.complete(response); |
| 302 | + } |
| 303 | + |
| 304 | + @Override |
| 305 | + public void onFailure(Exception exception) { |
| 306 | + remoteResponse.completeExceptionally(exception); |
| 307 | + } |
| 308 | + }); |
| 309 | + localClient.performRequestAsync(clones[1], new ResponseListener() { |
| 310 | + @Override |
| 311 | + public void onSuccess(Response response) { |
| 312 | + localResponse.complete(response); |
| 313 | + } |
| 314 | + |
| 315 | + @Override |
| 316 | + public void onFailure(Exception exception) { |
| 317 | + localResponse.completeExceptionally(exception); |
| 318 | + } |
| 319 | + }); |
| 320 | + try { |
| 321 | + Response remote = remoteResponse.get(); |
| 322 | + Response local = localResponse.get(); |
| 323 | + assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode()); |
| 324 | + return local; |
| 325 | + } catch (ExecutionException e) { |
| 326 | + throw e.getCause(); |
| 327 | + } |
| 328 | + } |
| 329 | + |
292 | 330 | /** |
293 | 331 | * Convert FROM employees ... => FROM *:employees,employees |
294 | 332 | */ |
|
0 commit comments