diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index 649ce00ef53ec..7138b0efe53ce 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -17,7 +17,6 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.logging.log4j.core.config.plugins.util.PluginManager; -import org.apache.lucene.util.IOConsumer; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -31,7 +30,6 @@ import org.elasticsearch.inference.TaskType; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentType; @@ -45,7 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Semaphore; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -58,7 +55,6 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader; public class CsvTestsDataLoader { - private static final int PARALLEL_THREADS = 10; private static final int BULK_DATA_SIZE = 100_000; private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields(); private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset( @@ -404,35 +400,16 @@ private static void loadDataSetIntoEs( IndexCreator indexCreator ) throws IOException { Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); - List datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled).stream() - .toList(); - executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel"); - - executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel"); - - forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger); - - executeInParallel( - ENRICH_POLICIES, - policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger), - "Failed to load enrich policies in parallel" - ); - - } - - private static void executeInParallel(List items, IOConsumer consumer, String errorMessage) { - Semaphore semaphore = new Semaphore(PARALLEL_THREADS); - ESTestCase.runInParallel(items.size(), i -> { - try { - semaphore.acquire(); - consumer.accept(items.get(i)); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(errorMessage, e); - } finally { - semaphore.release(); - } - }); + Set loadedDatasets = new HashSet<>(); + for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled)) { + load(client, dataset, logger, indexCreator); + loadedDatasets.add(dataset.indexName); + } + forceMerge(client, loadedDatasets, logger); + for (var policy : ENRICH_POLICIES) { + loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger); + } } public static void createInferenceEndpoints(RestClient client) throws IOException { @@ -589,13 +566,11 @@ private static URL getResource(String name) { return result; } - private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException { + private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException { URL mapping = getResource("/" + dataset.mappingFileName); Settings indexSettings = dataset.readSettingsFile(); indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings); - } - private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException { // Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those if (dataset.dataFileName != null) { URL data = getResource("/data/" + dataset.dataFileName); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java index 612e007c4b13f..5b40e1d03e92f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java @@ -11,18 +11,12 @@ import java.net.ConnectException; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; public class CsvTestsDataLoaderTests extends ESTestCase { public void testCsvTestsDataLoaderExecution() { - Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {})); - // find the root cause - while (cause.getCause() != null) { - cause = cause.getCause(); - } - assertThat(cause, instanceOf(ConnectException.class)); - assertThat(cause.getMessage(), startsWith("Connection refused")); + ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {})); + assertThat(ce.getMessage(), startsWith("Connection refused")); } }