Skip to content

Commit 4d3e27f

Browse files
Re-enable some performance updates to ES|QL spec tests after resilience improvement to EsqlSpecTestCase (#136781)
* Revert "Revert PR #134086 (#134738)"- This reverts commit 207787a. * Reapply "ESQL: Speed up MultiClusterSpecIT start up by 40% by parallelizing index creation (#134090)" (#134511) - This reverts commit d915090.
1 parent c3f1ae2 commit 4d3e27f

File tree

3 files changed

+91
-17
lines changed

3 files changed

+91
-17
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.Version;
1616
import org.elasticsearch.client.Request;
1717
import org.elasticsearch.client.Response;
18+
import org.elasticsearch.client.ResponseListener;
1819
import org.elasticsearch.client.RestClient;
1920
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.core.IOUtils;
@@ -38,6 +39,8 @@
3839
import java.util.List;
3940
import java.util.Locale;
4041
import java.util.Set;
42+
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.ExecutionException;
4144
import java.util.regex.Pattern;
4245
import java.util.stream.Collectors;
4346

@@ -251,10 +254,7 @@ static RestClient twoClients(RestClient localClient, RestClient remoteClient) th
251254
return bulkClient.performRequest(request);
252255
} else {
253256
Request[] clones = cloneRequests(request, 2);
254-
Response resp1 = remoteClient.performRequest(clones[0]);
255-
Response resp2 = localClient.performRequest(clones[1]);
256-
assertEquals(resp1.getStatusLine().getStatusCode(), resp2.getStatusLine().getStatusCode());
257-
return resp2;
257+
return runInParallel(localClient, remoteClient, clones);
258258
}
259259
});
260260
doAnswer(invocation -> {
@@ -289,6 +289,44 @@ static Request[] cloneRequests(Request orig, int numClones) throws IOException {
289289
return clones;
290290
}
291291

292+
/**
293+
* Run {@link #cloneRequests cloned} requests in parallel.
294+
*/
295+
static Response runInParallel(RestClient localClient, RestClient remoteClient, Request[] clones) throws Throwable {
296+
CompletableFuture<Response> remoteResponse = new CompletableFuture<>();
297+
CompletableFuture<Response> localResponse = new CompletableFuture<>();
298+
remoteClient.performRequestAsync(clones[0], new ResponseListener() {
299+
@Override
300+
public void onSuccess(Response response) {
301+
remoteResponse.complete(response);
302+
}
303+
304+
@Override
305+
public void onFailure(Exception exception) {
306+
remoteResponse.completeExceptionally(exception);
307+
}
308+
});
309+
localClient.performRequestAsync(clones[1], new ResponseListener() {
310+
@Override
311+
public void onSuccess(Response response) {
312+
localResponse.complete(response);
313+
}
314+
315+
@Override
316+
public void onFailure(Exception exception) {
317+
localResponse.completeExceptionally(exception);
318+
}
319+
});
320+
try {
321+
Response remote = remoteResponse.get();
322+
Response local = localResponse.get();
323+
assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode());
324+
return local;
325+
} catch (ExecutionException e) {
326+
throw e.getCause();
327+
}
328+
}
329+
292330
/**
293331
* Convert FROM employees ... => FROM *:employees,employees
294332
*/

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.http.client.CredentialsProvider;
1818
import org.apache.http.impl.client.BasicCredentialsProvider;
1919
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
20+
import org.apache.lucene.util.IOConsumer;
2021
import org.elasticsearch.client.Request;
2122
import org.elasticsearch.client.Response;
2223
import org.elasticsearch.client.ResponseException;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.inference.TaskType;
3132
import org.elasticsearch.logging.LogManager;
3233
import org.elasticsearch.logging.Logger;
34+
import org.elasticsearch.test.ESTestCase;
3335
import org.elasticsearch.test.rest.ESRestTestCase;
3436
import org.elasticsearch.xcontent.XContentType;
3537

@@ -43,6 +45,7 @@
4345
import java.util.List;
4446
import java.util.Map;
4547
import java.util.Set;
48+
import java.util.concurrent.Semaphore;
4649
import java.util.regex.Matcher;
4750
import java.util.regex.Pattern;
4851
import java.util.stream.Collectors;
@@ -55,6 +58,7 @@
5558
import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader;
5659

5760
public class CsvTestsDataLoader {
61+
private static final int PARALLEL_THREADS = 10;
5862
private static final int BULK_DATA_SIZE = 100_000;
5963
private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields();
6064
private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset(
@@ -429,18 +433,42 @@ private static void loadDataSetIntoEs(
429433
IndexCreator indexCreator
430434
) throws IOException {
431435
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);
436+
List<TestDataset> datasets = availableDatasetsForEs(
437+
supportsIndexModeLookup,
438+
supportsSourceFieldMapping,
439+
inferenceEnabled,
440+
timeSeriesOnly
441+
).stream().toList();
442+
443+
logger.info("Creating test indices");
444+
executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel");
432445

433-
Set<String> loadedDatasets = new HashSet<>();
434446
logger.info("Loading test datasets");
435-
for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, timeSeriesOnly)) {
436-
load(client, dataset, logger, indexCreator);
437-
loadedDatasets.add(dataset.indexName);
438-
}
439-
forceMerge(client, loadedDatasets, logger);
447+
executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel");
448+
449+
forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger);
450+
440451
logger.info("Loading enrich policies");
441-
for (var policy : ENRICH_POLICIES) {
442-
loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger);
443-
}
452+
executeInParallel(
453+
ENRICH_POLICIES,
454+
policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger),
455+
"Failed to load enrich policies in parallel"
456+
);
457+
458+
}
459+
460+
private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) {
461+
Semaphore semaphore = new Semaphore(PARALLEL_THREADS);
462+
ESTestCase.runInParallel(items.size(), i -> {
463+
try {
464+
semaphore.acquire();
465+
consumer.accept(items.get(i));
466+
} catch (IOException | InterruptedException e) {
467+
throw new RuntimeException(errorMessage, e);
468+
} finally {
469+
semaphore.release();
470+
}
471+
});
444472
}
445473

446474
public static void createInferenceEndpoints(RestClient client) throws IOException {
@@ -598,12 +626,14 @@ private static URL getResource(String name) {
598626
return result;
599627
}
600628

601-
private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException {
602-
logger.info("Loading dataset [{}] into ES index [{}]", dataset.dataFileName, dataset.indexName);
629+
private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException {
603630
URL mapping = getResource("/" + dataset.mappingFileName);
604631
Settings indexSettings = dataset.readSettingsFile();
605632
indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings);
633+
}
606634

635+
private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException {
636+
logger.info("Loading dataset [{}] into ES index [{}]", dataset.dataFileName, dataset.indexName);
607637
// Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those
608638
if (dataset.dataFileName != null) {
609639
URL data = getResource("/data/" + dataset.dataFileName);

x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,18 @@
1111

1212
import java.net.ConnectException;
1313

14+
import static org.hamcrest.Matchers.instanceOf;
1415
import static org.hamcrest.Matchers.startsWith;
1516

1617
public class CsvTestsDataLoaderTests extends ESTestCase {
1718

1819
public void testCsvTestsDataLoaderExecution() {
19-
ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {}));
20-
assertThat(ce.getMessage(), startsWith("Connection refused"));
20+
Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {}));
21+
// find the root cause
22+
while (cause.getCause() != null) {
23+
cause = cause.getCause();
24+
}
25+
assertThat(cause, instanceOf(ConnectException.class));
26+
assertThat(cause.getMessage(), startsWith("Connection refused"));
2127
}
2228
}

0 commit comments

Comments
 (0)