Skip to content

Commit 15c6458

Browse files
committed
Reapply "ESQL: Speed up MultiClusterSpecIT start up by 40% by parallelizing index creation (elastic#134090)" (elastic#134511)
This reverts commit d915090.
1 parent c13c071 commit 15c6458

File tree

2 files changed

+47
-12
lines changed

2 files changed

+47
-12
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.http.client.CredentialsProvider;
1818
import org.apache.http.impl.client.BasicCredentialsProvider;
1919
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
20+
import org.apache.lucene.util.IOConsumer;
2021
import org.elasticsearch.client.Request;
2122
import org.elasticsearch.client.Response;
2223
import org.elasticsearch.client.ResponseException;
@@ -30,6 +31,7 @@
3031
import org.elasticsearch.inference.TaskType;
3132
import org.elasticsearch.logging.LogManager;
3233
import org.elasticsearch.logging.Logger;
34+
import org.elasticsearch.test.ESTestCase;
3335
import org.elasticsearch.test.rest.ESRestTestCase;
3436
import org.elasticsearch.xcontent.XContentType;
3537

@@ -43,6 +45,7 @@
4345
import java.util.List;
4446
import java.util.Map;
4547
import java.util.Set;
48+
import java.util.concurrent.Semaphore;
4649
import java.util.regex.Matcher;
4750
import java.util.regex.Pattern;
4851
import java.util.stream.Collectors;
@@ -55,6 +58,7 @@
5558
import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader;
5659

5760
public class CsvTestsDataLoader {
61+
private static final int PARALLEL_THREADS = 10;
5862
private static final int BULK_DATA_SIZE = 100_000;
5963
private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields();
6064
private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset(
@@ -421,16 +425,39 @@ private static void loadDataSetIntoEs(
421425
IndexCreator indexCreator
422426
) throws IOException {
423427
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);
428+
List<TestDataset> datasets = availableDatasetsForEs(
429+
supportsIndexModeLookup,
430+
supportsSourceFieldMapping,
431+
inferenceEnabled,
432+
timeSeriesOnly
433+
).stream().toList();
434+
435+
executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel");
436+
437+
executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel");
438+
439+
forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger);
440+
441+
executeInParallel(
442+
ENRICH_POLICIES,
443+
policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger),
444+
"Failed to load enrich policies in parallel"
445+
);
424446

425-
Set<String> loadedDatasets = new HashSet<>();
426-
for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled, timeSeriesOnly)) {
427-
load(client, dataset, logger, indexCreator);
428-
loadedDatasets.add(dataset.indexName);
429-
}
430-
forceMerge(client, loadedDatasets, logger);
431-
for (var policy : ENRICH_POLICIES) {
432-
loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger);
433-
}
447+
}
448+
449+
private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) {
450+
Semaphore semaphore = new Semaphore(PARALLEL_THREADS);
451+
ESTestCase.runInParallel(items.size(), i -> {
452+
try {
453+
semaphore.acquire();
454+
consumer.accept(items.get(i));
455+
} catch (IOException | InterruptedException e) {
456+
throw new RuntimeException(errorMessage, e);
457+
} finally {
458+
semaphore.release();
459+
}
460+
});
434461
}
435462

436463
public static void createInferenceEndpoints(RestClient client) throws IOException {
@@ -587,11 +614,13 @@ private static URL getResource(String name) {
587614
return result;
588615
}
589616

590-
private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException {
617+
private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException {
591618
URL mapping = getResource("/" + dataset.mappingFileName);
592619
Settings indexSettings = dataset.readSettingsFile();
593620
indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings);
621+
}
594622

623+
private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException {
595624
// Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those
596625
if (dataset.dataFileName != null) {
597626
URL data = getResource("/data/" + dataset.dataFileName);

x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,18 @@
1111

1212
import java.net.ConnectException;
1313

14+
import static org.hamcrest.Matchers.instanceOf;
1415
import static org.hamcrest.Matchers.startsWith;
1516

1617
public class CsvTestsDataLoaderTests extends ESTestCase {
1718

1819
public void testCsvTestsDataLoaderExecution() {
19-
ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {}));
20-
assertThat(ce.getMessage(), startsWith("Connection refused"));
20+
Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {}));
21+
// find the root cause
22+
while (cause.getCause() != null) {
23+
cause = cause.getCause();
24+
}
25+
assertThat(cause, instanceOf(ConnectException.class));
26+
assertThat(cause.getMessage(), startsWith("Connection refused"));
2127
}
2228
}

0 commit comments

Comments
 (0)