Skip to content

Commit 93d0896

Browse files
ESQL: Speed up MultiClusterSpecIT start up by 40% by parallelizing index creation (#134090)
Speeds up MultiClusterSpecIT by parallelizing calls in CsvTestDataLoader to create index, load data into index and create enrich policy. Drops the runtime of the first test from 117sec to 70sec for me locally in IntelliJ.
1 parent dc743a9 commit 93d0896

File tree

2 files changed

+43
-12
lines changed

2 files changed

+43
-12
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.http.client.CredentialsProvider;
1818
import org.apache.http.impl.client.BasicCredentialsProvider;
1919
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
20+
import org.apache.lucene.util.IOConsumer;
2021
import org.elasticsearch.client.Request;
2122
import org.elasticsearch.client.Response;
2223
import org.elasticsearch.client.ResponseException;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.inference.TaskType;
3132
import org.elasticsearch.logging.LogManager;
3233
import org.elasticsearch.logging.Logger;
34+
import org.elasticsearch.test.ESTestCase;
3335
import org.elasticsearch.test.rest.ESRestTestCase;
3436
import org.elasticsearch.xcontent.XContentType;
3537

@@ -43,6 +45,7 @@
4345
import java.util.List;
4446
import java.util.Map;
4547
import java.util.Set;
48+
import java.util.concurrent.Semaphore;
4649
import java.util.stream.Collectors;
4750
import java.util.stream.Stream;
4851

@@ -53,6 +56,7 @@
5356
import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader;
5457

5558
public class CsvTestsDataLoader {
59+
private static final int PARALLEL_THREADS = 10;
5660
private static final int BULK_DATA_SIZE = 100_000;
5761
private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields();
5862
private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset(
@@ -398,16 +402,35 @@ private static void loadDataSetIntoEs(
398402
IndexCreator indexCreator
399403
) throws IOException {
400404
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);
405+
List<TestDataset> datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled).stream()
406+
.toList();
407+
408+
executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel");
409+
410+
executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel");
411+
412+
forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger);
413+
414+
executeInParallel(
415+
ENRICH_POLICIES,
416+
policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger),
417+
"Failed to load enrich policies in parallel"
418+
);
401419

402-
Set<String> loadedDatasets = new HashSet<>();
403-
for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled)) {
404-
load(client, dataset, logger, indexCreator);
405-
loadedDatasets.add(dataset.indexName);
406-
}
407-
forceMerge(client, loadedDatasets, logger);
408-
for (var policy : ENRICH_POLICIES) {
409-
loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger);
410-
}
420+
}
421+
422+
private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) {
423+
Semaphore semaphore = new Semaphore(PARALLEL_THREADS);
424+
ESTestCase.runInParallel(items.size(), i -> {
425+
try {
426+
semaphore.acquire();
427+
consumer.accept(items.get(i));
428+
} catch (IOException | InterruptedException e) {
429+
throw new RuntimeException(errorMessage, e);
430+
} finally {
431+
semaphore.release();
432+
}
433+
});
411434
}
412435

413436
public static void createInferenceEndpoints(RestClient client) throws IOException {
@@ -564,11 +587,13 @@ private static URL getResource(String name) {
564587
return result;
565588
}
566589

567-
private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException {
590+
private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException {
568591
URL mapping = getResource("/" + dataset.mappingFileName);
569592
Settings indexSettings = dataset.readSettingsFile();
570593
indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings);
594+
}
571595

596+
private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException {
572597
// Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those
573598
if (dataset.dataFileName != null) {
574599
URL data = getResource("/data/" + dataset.dataFileName);

x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,18 @@
1111

1212
import java.net.ConnectException;
1313

14+
import static org.hamcrest.Matchers.instanceOf;
1415
import static org.hamcrest.Matchers.startsWith;
1516

1617
public class CsvTestsDataLoaderTests extends ESTestCase {
1718

1819
public void testCsvTestsDataLoaderExecution() {
19-
ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {}));
20-
assertThat(ce.getMessage(), startsWith("Connection refused"));
20+
Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {}));
21+
// find the root cause
22+
while (cause.getCause() != null) {
23+
cause = cause.getCause();
24+
}
25+
assertThat(cause, instanceOf(ConnectException.class));
26+
assertThat(cause.getMessage(), startsWith("Connection refused"));
2127
}
2228
}

0 commit comments

Comments
 (0)