diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index 765e9d9173e87..c2cefc32e07b4 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -17,6 +17,7 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.logging.log4j.core.config.plugins.util.PluginManager; +import org.apache.lucene.util.IOConsumer; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -30,6 +31,7 @@ import org.elasticsearch.inference.TaskType; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentType; @@ -43,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Semaphore; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -53,6 +56,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader; public class CsvTestsDataLoader { + private static final int PARALLEL_THREADS = 10; private static final int BULK_DATA_SIZE = 100_000; private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields(); private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset( @@ -398,16 +402,35 @@ private static void loadDataSetIntoEs( IndexCreator indexCreator ) throws IOException { Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); + List datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled).stream() + .toList(); + + executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel"); + + executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel"); + + forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger); + + executeInParallel( + ENRICH_POLICIES, + policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger), + "Failed to load enrich policies in parallel" + ); - Set loadedDatasets = new HashSet<>(); - for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled)) { - load(client, dataset, logger, indexCreator); - loadedDatasets.add(dataset.indexName); - } - forceMerge(client, loadedDatasets, logger); - for (var policy : ENRICH_POLICIES) { - loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger); - } + } + + private static void executeInParallel(List items, IOConsumer consumer, String errorMessage) { + Semaphore semaphore = new Semaphore(PARALLEL_THREADS); + ESTestCase.runInParallel(items.size(), i -> { + try { + semaphore.acquire(); + consumer.accept(items.get(i)); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(errorMessage, e); + } finally { + semaphore.release(); + } + }); } public static void createInferenceEndpoints(RestClient client) throws IOException { @@ -564,11 +587,13 @@ private static URL getResource(String name) { return result; } - private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException { + private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException { URL mapping = getResource("/" + dataset.mappingFileName); Settings indexSettings = dataset.readSettingsFile(); indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings); + } + private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException { // Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those if (dataset.dataFileName != null) { URL data = getResource("/data/" + dataset.dataFileName); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java index 5b40e1d03e92f..612e007c4b13f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java @@ -11,12 +11,18 @@ import java.net.ConnectException; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; public class CsvTestsDataLoaderTests extends ESTestCase { public void testCsvTestsDataLoaderExecution() { - ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {})); - assertThat(ce.getMessage(), startsWith("Connection refused")); + Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {})); + // find the root cause + while (cause.getCause() != null) { + cause = cause.getCause(); + } + assertThat(cause, instanceOf(ConnectException.class)); + assertThat(cause.getMessage(), startsWith("Connection refused")); } }