From 3d192c6e68cd7bb1fdc6b96e805bf6ff833d696e Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Wed, 3 Sep 2025 16:25:18 -0400 Subject: [PATCH 1/4] Parallelize CsvTestsDataLoader --- .../xpack/esql/CsvTestsDataLoader.java | 91 ++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index c56ed4d489843..315457e702993 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -17,6 +17,7 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.logging.log4j.core.config.plugins.util.PluginManager; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -43,6 +44,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -53,6 +57,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader; public class CsvTestsDataLoader { + private static final int PARALLEL_THREADS = 10; private static final int BULK_DATA_SIZE = 100_000; private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields(); private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset( @@ -396,12 +401,27 @@ private static void loadDataSetIntoEs( boolean supportsSourceFieldMapping, boolean inferenceEnabled, IndexCreator indexCreator + ) throws IOException { + if (PARALLEL_THREADS > 1) { + loadDataSetIntoEsParallel(client, supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, indexCreator); + } else { + loadDataSetIntoEsSequential(client, supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, indexCreator); + } + } + + private static void loadDataSetIntoEsSequential( + RestClient client, + boolean supportsIndexModeLookup, + boolean supportsSourceFieldMapping, + boolean inferenceEnabled, + IndexCreator indexCreator ) throws IOException { Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); Set loadedDatasets = new HashSet<>(); for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled)) { - load(client, dataset, logger, indexCreator); + createIndex(client, dataset, indexCreator); + loadData(client, dataset, logger); loadedDatasets.add(dataset.indexName); } forceMerge(client, loadedDatasets, logger); @@ -410,6 +430,71 @@ private static void loadDataSetIntoEs( } } + private static void loadDataSetIntoEsParallel( + RestClient client, + boolean supportsIndexModeLookup, + boolean supportsSourceFieldMapping, + boolean inferenceEnabled, + IndexCreator indexCreator + ) throws IOException { + Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); + Set datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled); + ExecutorService executor = Executors.newFixedThreadPool(PARALLEL_THREADS); + try { + executeInParallel( + executor, + datasets, + dataset -> createIndex(client, dataset, indexCreator), + "Failed to create indices in parallel" + ); + + executeInParallel(executor, datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel"); + + forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger); + + executeInParallel( + executor, + ENRICH_POLICIES, + policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger), + "Failed to load enrich policies in parallel" + ); + } finally { + executor.shutdown(); + } + } + + @FunctionalInterface + private interface IOConsumer { + void accept(T t) throws IOException; + } + + private static void executeInParallel(ExecutorService executor, Iterable items, IOConsumer consumer, String errorMessage) + throws IOException { + List> futures = new ArrayList<>(); + for (T item : items) { + futures.add(executor.submit(() -> { + try { + consumer.accept(item); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + RuntimeException exception = null; + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + exception = ExceptionsHelper.useOrSuppress(exception, ExceptionsHelper.convertToRuntime(e)); + } + } + + if (exception != null) { + throw new IOException(errorMessage, exception); + } + } + public static void createInferenceEndpoints(RestClient client) throws IOException { if (clusterHasSparseEmbeddingInferenceEndpoint(client) == false) { createSparseEmbeddingInferenceEndpoint(client); @@ -535,11 +620,13 @@ private static URL getResource(String name) { return result; } - private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException { + private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException { URL mapping = getResource("/" + dataset.mappingFileName); Settings indexSettings = dataset.readSettingsFile(); indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings); + } + private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException { // Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those if (dataset.dataFileName != null) { URL data = getResource("/data/" + dataset.dataFileName); From 8d30e9fee03e98676393b94c6c3850372654ed80 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Thu, 4 Sep 2025 11:48:17 -0400 Subject: [PATCH 2/4] Address code review feedback --- .../xpack/esql/CsvTestsDataLoader.java | 41 +------------------ 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index 315457e702993..f5f17302c6ecc 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -17,6 +17,7 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.logging.log4j.core.config.plugins.util.PluginManager; +import org.apache.lucene.util.IOConsumer; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -401,41 +402,6 @@ private static void loadDataSetIntoEs( boolean supportsSourceFieldMapping, boolean inferenceEnabled, IndexCreator indexCreator - ) throws IOException { - if (PARALLEL_THREADS > 1) { - loadDataSetIntoEsParallel(client, supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, indexCreator); - } else { - loadDataSetIntoEsSequential(client, supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, indexCreator); - } - } - - private static void loadDataSetIntoEsSequential( - RestClient client, - boolean supportsIndexModeLookup, - boolean supportsSourceFieldMapping, - boolean inferenceEnabled, - IndexCreator indexCreator - ) throws IOException { - Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); - - Set loadedDatasets = new HashSet<>(); - for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled)) { - createIndex(client, dataset, indexCreator); - loadData(client, dataset, logger); - loadedDatasets.add(dataset.indexName); - } - forceMerge(client, loadedDatasets, logger); - for (var policy : ENRICH_POLICIES) { - loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger); - } - } - - private static void loadDataSetIntoEsParallel( - RestClient client, - boolean supportsIndexModeLookup, - boolean supportsSourceFieldMapping, - boolean inferenceEnabled, - IndexCreator indexCreator ) throws IOException { Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); Set datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled); @@ -463,11 +429,6 @@ private static void loadDataSetIntoEsParallel( } } - @FunctionalInterface - private interface IOConsumer { - void accept(T t) throws IOException; - } - private static void executeInParallel(ExecutorService executor, Iterable items, IOConsumer consumer, String errorMessage) throws IOException { List> futures = new ArrayList<>(); From 5214c3dbd40e9cac5a8aeee0535940e8421eeeb9 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Thu, 4 Sep 2025 13:23:45 -0400 Subject: [PATCH 3/4] Refactor to use ESTestCase.runInParallel() --- .../xpack/esql/CsvTestsDataLoader.java | 71 +++++++------------ 1 file changed, 24 insertions(+), 47 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index f5f17302c6ecc..bcc14e789a6eb 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -18,7 +18,6 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.logging.log4j.core.config.plugins.util.PluginManager; import org.apache.lucene.util.IOConsumer; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -32,6 +31,7 @@ import org.elasticsearch.inference.TaskType; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentType; @@ -45,9 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -404,56 +402,35 @@ private static void loadDataSetIntoEs( IndexCreator indexCreator ) throws IOException { Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); - Set datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled); - ExecutorService executor = Executors.newFixedThreadPool(PARALLEL_THREADS); - try { - executeInParallel( - executor, - datasets, - dataset -> createIndex(client, dataset, indexCreator), - "Failed to create indices in parallel" - ); + List datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled).stream() + .toList(); - executeInParallel(executor, datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel"); + executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel"); - forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger); + executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel"); - executeInParallel( - executor, - ENRICH_POLICIES, - policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger), - "Failed to load enrich policies in parallel" - ); - } finally { - executor.shutdown(); - } - } + forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger); - private static void executeInParallel(ExecutorService executor, Iterable items, IOConsumer consumer, String errorMessage) - throws IOException { - List> futures = new ArrayList<>(); - for (T item : items) { - futures.add(executor.submit(() -> { - try { - consumer.accept(item); - } catch (IOException e) { - throw new RuntimeException(e); - } - })); - } + executeInParallel( + ENRICH_POLICIES, + policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger), + "Failed to load enrich policies in parallel" + ); + + } - RuntimeException exception = null; - for (Future future : futures) { + private static void executeInParallel(List items, IOConsumer consumer, String errorMessage) { + Semaphore semaphore = new Semaphore(PARALLEL_THREADS); + ESTestCase.runInParallel(items.size(), i -> { try { - future.get(); - } catch (Exception e) { - exception = ExceptionsHelper.useOrSuppress(exception, ExceptionsHelper.convertToRuntime(e)); + semaphore.acquire(); + consumer.accept(items.get(i)); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(errorMessage, e); + } finally { + semaphore.release(); } - } - - if (exception != null) { - throw new IOException(errorMessage, exception); - } + }); } public static void createInferenceEndpoints(RestClient client) throws IOException { From 6d09a10dafd60b9025cdc1cb26d6658b88165839 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Thu, 4 Sep 2025 13:38:22 -0400 Subject: [PATCH 4/4] Fix UT failure --- .../xpack/esql/CsvTestsDataLoaderTests.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java index 5b40e1d03e92f..612e007c4b13f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java @@ -11,12 +11,18 @@ import java.net.ConnectException; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; public class CsvTestsDataLoaderTests extends ESTestCase { public void testCsvTestsDataLoaderExecution() { - ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {})); - assertThat(ce.getMessage(), startsWith("Connection refused")); + Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {})); + // find the root cause + while (cause.getCause() != null) { + cause = cause.getCause(); + } + assertThat(cause, instanceOf(ConnectException.class)); + assertThat(cause.getMessage(), startsWith("Connection refused")); } }