From 8d2e553b33b9fba128d847b598c9f38e855ca421 Mon Sep 17 00:00:00 2001 From: Craig Taverner Date: Mon, 3 Nov 2025 14:42:38 +0100 Subject: [PATCH 1/2] Revert "Re-enable some performance updates to ES|QL spec tests after resilience improvement to EsqlSpecTestCase (#136781)" This reverts commit 4d3e27f1958b915a0b512309be4966a4cb08586e. --- .../xpack/esql/ccq/MultiClusterSpecIT.java | 46 ++-------------- .../xpack/esql/CsvTestsDataLoader.java | 52 ++++--------------- .../xpack/esql/CsvTestsDataLoaderTests.java | 10 +--- 3 files changed, 17 insertions(+), 91 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 1e7879154ad75..5f5408d7fa189 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -15,7 +15,6 @@ import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.IOUtils; @@ -39,8 +38,6 @@ import java.util.List; import java.util.Locale; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -254,7 +251,10 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th return bulkClient.performRequest(request); } else { Request[] clones = cloneRequests(request, 2); - return runInParallel(localClient, remoteClient, clones); + Response resp1 = remoteClient.performRequest(clones[0]); + Response resp2 = localClient.performRequest(clones[1]); + assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); + return resp2; } }); doAnswer(invocation -> { @@ -289,44 +289,6 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException { return clones; } - /** - * Run {@link #cloneRequests cloned} requests in parallel. - */ - static Response runInParallel(RestClient localClient, RestClient remoteClient, Request[] clones) throws Throwable { - CompletableFuture remoteResponse = new CompletableFuture<>(); - CompletableFuture localResponse = new CompletableFuture<>(); - remoteClient.performRequestAsync(clones[0], new ResponseListener() { - @Override - public void onSuccess(Response response) { - remoteResponse.complete(response); - } - - @Override - public void onFailure(Exception exception) { - remoteResponse.completeExceptionally(exception); - } - }); - localClient.performRequestAsync(clones[1], new ResponseListener() { - @Override - public void onSuccess(Response response) { - localResponse.complete(response); - } - - @Override - public void onFailure(Exception exception) { - localResponse.completeExceptionally(exception); - } - }); - try { - Response remote = remoteResponse.get(); - Response local = localResponse.get(); - assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode()); - return local; - } catch (ExecutionException e) { - throw e.getCause(); - } - } - /** * Convert FROM employees ... => FROM *:employees,employees */ diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java index 369a5ea61d9e0..d056050f8fe97 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java @@ -17,7 +17,6 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.logging.log4j.core.config.plugins.util.PluginManager; -import org.apache.lucene.util.IOConsumer; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -31,7 +30,6 @@ import org.elasticsearch.inference.TaskType; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentType; @@ -45,7 +43,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Semaphore; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -58,7 +55,6 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader; public class CsvTestsDataLoader { - private static final int PARALLEL_THREADS = 10; private static final int BULK_DATA_SIZE = 100_000; private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields(); private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset( @@ -433,42 +429,18 @@ private static void loadDataSetIntoEs( IndexCreator indexCreator ) throws IOException { Logger logger = LogManager.getLogger(CsvTestsDataLoader.class); - List datasets = availableDatasetsForEs( - supportsIndexModeLookup, - supportsSourceFieldMapping, - inferenceEnabled, - timeSeriesOnly - ).stream().toList(); - - logger.info("Creating test indices"); - executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel"); + Set loadedDatasets = new HashSet<>(); logger.info("Loading test datasets"); - executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel"); - - forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger); - + for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, timeSeriesOnly)) { + load(client, dataset, logger, indexCreator); + loadedDatasets.add(dataset.indexName); + } + forceMerge(client, loadedDatasets, logger); logger.info("Loading enrich policies"); - executeInParallel( - ENRICH_POLICIES, - policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger), - "Failed to load enrich policies in parallel" - ); - - } - - private static void executeInParallel(List items, IOConsumer consumer, String errorMessage) { - Semaphore semaphore = new Semaphore(PARALLEL_THREADS); - ESTestCase.runInParallel(items.size(), i -> { - try { - semaphore.acquire(); - consumer.accept(items.get(i)); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(errorMessage, e); - } finally { - semaphore.release(); - } - }); + for (var policy : ENRICH_POLICIES) { + loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger); + } } public static void createInferenceEndpoints(RestClient client) throws IOException { @@ -626,14 +598,12 @@ private static URL getResource(String name) { return result; } - private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException { + private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException { + logger.info("Loading dataset [{}] into ES index [{}]", dataset.dataFileName, dataset.indexName); URL mapping = getResource("/" + dataset.mappingFileName); Settings indexSettings = dataset.readSettingsFile(); indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings); - } - private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException { - logger.info("Loading dataset [{}] into ES index [{}]", dataset.dataFileName, dataset.indexName); // Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those if (dataset.dataFileName != null) { URL data = getResource("/data/" + dataset.dataFileName); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java index 612e007c4b13f..5b40e1d03e92f 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java @@ -11,18 +11,12 @@ import java.net.ConnectException; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; public class CsvTestsDataLoaderTests extends ESTestCase { public void testCsvTestsDataLoaderExecution() { - Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {})); - // find the root cause - while (cause.getCause() != null) { - cause = cause.getCause(); - } - assertThat(cause, instanceOf(ConnectException.class)); - assertThat(cause.getMessage(), startsWith("Connection refused")); + ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {})); + assertThat(ce.getMessage(), startsWith("Connection refused")); } } From e3ce07958c0e641d9ab3ee9689bdd4a33162c271 Mon Sep 17 00:00:00 2001 From: Craig Taverner Date: Mon, 3 Nov 2025 14:46:44 +0100 Subject: [PATCH 2/2] Revert "Revert PR #134086 (#134738)" This reverts commit 207787a027fad4cff92a438a8a699cd1df386e80. --- .../xpack/esql/ccq/MultiClusterSpecIT.java | 46 +++++++++++++++++-- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 5f5408d7fa189..1e7879154ad75 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.IOUtils; @@ -38,6 +39,8 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -251,10 +254,7 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th return bulkClient.performRequest(request); } else { Request[] clones = cloneRequests(request, 2); - Response resp1 = remoteClient.performRequest(clones[0]); - Response resp2 = localClient.performRequest(clones[1]); - assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode()); - return resp2; + return runInParallel(localClient, remoteClient, clones); } }); doAnswer(invocation -> { @@ -289,6 +289,44 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException { return clones; } + /** + * Run {@link #cloneRequests cloned} requests in parallel. + */ + static Response runInParallel(RestClient localClient, RestClient remoteClient, Request[] clones) throws Throwable { + CompletableFuture remoteResponse = new CompletableFuture<>(); + CompletableFuture localResponse = new CompletableFuture<>(); + remoteClient.performRequestAsync(clones[0], new ResponseListener() { + @Override + public void onSuccess(Response response) { + remoteResponse.complete(response); + } + + @Override + public void onFailure(Exception exception) { + remoteResponse.completeExceptionally(exception); + } + }); + localClient.performRequestAsync(clones[1], new ResponseListener() { + @Override + public void onSuccess(Response response) { + localResponse.complete(response); + } + + @Override + public void onFailure(Exception exception) { + localResponse.completeExceptionally(exception); + } + }); + try { + Response remote = remoteResponse.get(); + Response local = localResponse.get(); + assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode()); + return local; + } catch (ExecutionException e) { + throw e.getCause(); + } + } + /** * Convert FROM employees ... => FROM *:employees,employees */