diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 71687afcec6e8..c9f6e66553f00 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.IOUtils; @@ -40,6 +41,8 @@ import java.util.Locale; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -247,10 +250,7 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th return bulkClient.performRequest(request); } else { Request[] clones = cloneRequests(request, 2); - Response resp1 = remoteClient.performRequest(clones[0]); - Response resp2 = localClient.performRequest(clones[1]); - assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); - return resp2; + return runInParallel(localClient, remoteClient, clones); } }); doAnswer(invocation -> { @@ -285,6 +285,44 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException { return clones; } + /** + * Run {@link #cloneRequests cloned} requests in parallel. + */ + static Response runInParallel(RestClient localClient, RestClient remoteClient, Request[] clones) throws Throwable { + CompletableFuture remoteResponse = new CompletableFuture<>(); + CompletableFuture localResponse = new CompletableFuture<>(); + remoteClient.performRequestAsync(clones[0], new ResponseListener() { + @Override + public void onSuccess(Response response) { + remoteResponse.complete(response); + } + + @Override + public void onFailure(Exception exception) { + remoteResponse.completeExceptionally(exception); + } + }); + localClient.performRequestAsync(clones[1], new ResponseListener() { + @Override + public void onSuccess(Response response) { + localResponse.complete(response); + } + + @Override + public void onFailure(Exception exception) { + localResponse.completeExceptionally(exception); + } + }); + try { + Response remote = remoteResponse.get(); + Response local = localResponse.get(); + assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode()); + return local; + } catch (ExecutionException e) { + throw e.getCause(); + } + } + /** * Convert FROM employees ... => FROM *:employees,employees */ diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java index 2ecd089dedd2f..bee67b1486192 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java @@ -51,6 +51,7 @@ import java.util.Locale; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -122,23 +123,59 @@ protected EsqlSpecTestCase( this.mode = randomFrom(Mode.values()); } - private static boolean dataLoaded = false; + private static class Protected { + private volatile boolean completed = false; + private volatile boolean started = false; + private volatile Throwable failure = null; + + private void protectedBlock(Callable callable) { + if (completed) { + return; + } + // In case tests get run in parallel, we ensure only one setup is run, and other tests wait for this + synchronized (this) { + if (completed) { + return; + } + if (started) { + // Should only happen if a previous test setup failed, possibly with partial setup, let's fail fast the current test + if (failure != null) { + fail(failure, "Previous test setup failed: " + failure.getMessage()); + } + fail("Previous test setup failed with unknown error"); + } + started = true; + try { + callable.call(); + completed = true; + } catch (Throwable t) { + failure = t; + fail(failure, "Current test setup failed: " + failure.getMessage()); + } + } + } + + private synchronized void reset() { + completed = false; + started = false; + failure = null; + } + } + + private static final Protected INGEST = new Protected(); protected static boolean testClustersOk = true; @Before - public void setup() throws IOException { + public void setup() { assumeTrue("test clusters were broken", testClustersOk); - boolean supportsLookup = supportsIndexModeLookup(); - boolean supportsSourceMapping = supportsSourceFieldMapping(); - boolean supportsInferenceTestService = supportsInferenceTestService(); - if (dataLoaded == false) { - if (supportsInferenceTestService) { + INGEST.protectedBlock(() -> { + // Inference endpoints must be created before ingesting any datasets that rely on them (mapping of inference_id) + if (supportsInferenceTestService()) { createInferenceEndpoints(adminClient()); } - - loadDataSetIntoEs(client(), supportsLookup, supportsSourceMapping, supportsInferenceTestService); - dataLoaded = true; - } + loadDataSetIntoEs(client(), supportsIndexModeLookup(), supportsSourceFieldMapping(), supportsInferenceTestService()); + return null; + }); } @AfterClass @@ -147,7 +184,6 @@ public static void wipeTestData() throws IOException { return; } try { - dataLoaded = false; adminClient().performRequest(new Request("DELETE", "/*")); } catch (ResponseException e) { // 404 here just means we had no indexes @@ -155,7 +191,7 @@ public static void wipeTestData() throws IOException { throw e; } } - + INGEST.reset(); deleteInferenceEndpoints(adminClient()); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index 96f8ffdcccbbe..6ad1223b72d5c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -17,6 +17,7 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.logging.log4j.core.config.plugins.util.PluginManager; +import org.apache.lucene.util.IOConsumer; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -30,6 +31,7 @@ import org.elasticsearch.inference.TaskType; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentType; @@ -43,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Semaphore; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -55,6 +58,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader; public class CsvTestsDataLoader { + private static final int PARALLEL_THREADS = 10; private static final int BULK_DATA_SIZE = 100_000; private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields(); private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset( @@ -421,16 +425,39 @@ private static void loadDataSetIntoEs( IndexCreator indexCreator ) throws IOException { Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); + List datasets = availableDatasetsForEs( + supportsIndexModeLookup, + supportsSourceFieldMapping, + inferenceEnabled, + timeSeriesOnly + ).stream().toList(); + + executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel"); + + executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel"); + + forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger); + + executeInParallel( + ENRICH_POLICIES, + policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger), + "Failed to load enrich policies in parallel" + ); - Set loadedDatasets = new HashSet<>(); - for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, timeSeriesOnly)) { - load(client, dataset, logger, indexCreator); - loadedDatasets.add(dataset.indexName); - } - forceMerge(client, loadedDatasets, logger); - for (var policy : ENRICH_POLICIES) { - loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger); - } + } + + private static void executeInParallel(List items, IOConsumer consumer, String errorMessage) { + Semaphore semaphore = new Semaphore(PARALLEL_THREADS); + ESTestCase.runInParallel(items.size(), i -> { + try { + semaphore.acquire(); + consumer.accept(items.get(i)); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(errorMessage, e); + } finally { + semaphore.release(); + } + }); } public static void createInferenceEndpoints(RestClient client) throws IOException { @@ -587,11 +614,13 @@ private static URL getResource(String name) { return result; } - private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException { + private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException { URL mapping = getResource("/" + dataset.mappingFileName); Settings indexSettings = dataset.readSettingsFile(); indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings); + } + private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException { // Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those if (dataset.dataFileName != null) { URL data = getResource("/data/" + dataset.dataFileName); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java index 5b40e1d03e92f..612e007c4b13f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java @@ -11,12 +11,18 @@ import java.net.ConnectException; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; public class CsvTestsDataLoaderTests extends ESTestCase { public void testCsvTestsDataLoaderExecution() { - ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {})); - assertThat(ce.getMessage(), startsWith("Connection refused")); + Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {})); + // find the root cause + while (cause.getCause() != null) { + cause = cause.getCause(); + } + assertThat(cause, instanceOf(ConnectException.class)); + assertThat(cause.getMessage(), startsWith("Connection refused")); } }