| 
15 | 15 | import org.elasticsearch.Version;  | 
16 | 16 | import org.elasticsearch.client.Request;  | 
17 | 17 | import org.elasticsearch.client.Response;  | 
 | 18 | +import org.elasticsearch.client.ResponseListener;  | 
18 | 19 | import org.elasticsearch.client.RestClient;  | 
19 | 20 | import org.elasticsearch.common.settings.Settings;  | 
20 | 21 | import org.elasticsearch.core.IOUtils;  | 
 | 
40 | 41 | import java.util.Locale;  | 
41 | 42 | import java.util.Optional;  | 
42 | 43 | import java.util.Set;  | 
 | 44 | +import java.util.concurrent.CompletableFuture;  | 
 | 45 | +import java.util.concurrent.ExecutionException;  | 
43 | 46 | import java.util.regex.Pattern;  | 
44 | 47 | import java.util.stream.Collectors;  | 
45 | 48 | 
 
  | 
@@ -248,10 +251,7 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th  | 
248 | 251 |                     return bulkClient.performRequest(request);  | 
249 | 252 |                 } else {  | 
250 | 253 |                     Request[] clones = cloneRequests(request, 2);  | 
251 |  | -                    Response resp1 = remoteClient.performRequest(clones[0]);  | 
252 |  | -                    Response resp2 = localClient.performRequest(clones[1]);  | 
253 |  | -                    assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode());  | 
254 |  | -                    return resp2;  | 
 | 254 | +                    return runInParallel(localClient, remoteClient, clones);  | 
255 | 255 |                 }  | 
256 | 256 |         });  | 
257 | 257 |         doAnswer(invocation -> {  | 
@@ -286,6 +286,44 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException {  | 
286 | 286 |         return clones;  | 
287 | 287 |     }  | 
288 | 288 | 
 
  | 
 | 289 | +    /**  | 
 | 290 | +     * Run {@link #cloneRequests cloned} requests in parallel.  | 
 | 291 | +     */  | 
 | 292 | +    static Response runInParallel(RestClient localClient, RestClient remoteClient, Request[] clones) throws Throwable {  | 
 | 293 | +        CompletableFuture<Response> remoteResponse = new CompletableFuture<>();  | 
 | 294 | +        CompletableFuture<Response> localResponse = new CompletableFuture<>();  | 
 | 295 | +        remoteClient.performRequestAsync(clones[0], new ResponseListener() {  | 
 | 296 | +            @Override  | 
 | 297 | +            public void onSuccess(Response response) {  | 
 | 298 | +                remoteResponse.complete(response);  | 
 | 299 | +            }  | 
 | 300 | + | 
 | 301 | +            @Override  | 
 | 302 | +            public void onFailure(Exception exception) {  | 
 | 303 | +                remoteResponse.completeExceptionally(exception);  | 
 | 304 | +            }  | 
 | 305 | +        });  | 
 | 306 | +        localClient.performRequestAsync(clones[1], new ResponseListener() {  | 
 | 307 | +            @Override  | 
 | 308 | +            public void onSuccess(Response response) {  | 
 | 309 | +                localResponse.complete(response);  | 
 | 310 | +            }  | 
 | 311 | + | 
 | 312 | +            @Override  | 
 | 313 | +            public void onFailure(Exception exception) {  | 
 | 314 | +                localResponse.completeExceptionally(exception);  | 
 | 315 | +            }  | 
 | 316 | +        });  | 
 | 317 | +        try {  | 
 | 318 | +            Response remote = remoteResponse.get();  | 
 | 319 | +            Response local = localResponse.get();  | 
 | 320 | +            assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode());  | 
 | 321 | +            return local;  | 
 | 322 | +        } catch (ExecutionException e) {  | 
 | 323 | +            throw e.getCause();  | 
 | 324 | +        }  | 
 | 325 | +    }  | 
 | 326 | + | 
289 | 327 |     /**  | 
290 | 328 |      * Convert FROM employees ... => FROM *:employees,employees  | 
291 | 329 |      */  | 
 | 
0 commit comments