Skip to content

Commit d915090

Browse files
Revert "ESQL: Speed up MultiClusterSpecIT start up by 40% by parallelizing index creation (#134090)" (#134511)
This reverts commit 93d0896 merged with #134090 Unfortunately, it caused a flaky test found by #134414 Closes #134414
1 parent e5559ef commit d915090

File tree

2 files changed

+12
-43
lines changed

2 files changed

+12
-43
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.apache.http.client.CredentialsProvider;
1818
import org.apache.http.impl.client.BasicCredentialsProvider;
1919
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
20-
import org.apache.lucene.util.IOConsumer;
2120
import org.elasticsearch.client.Request;
2221
import org.elasticsearch.client.Response;
2322
import org.elasticsearch.client.ResponseException;
@@ -31,7 +30,6 @@
3130
import org.elasticsearch.inference.TaskType;
3231
import org.elasticsearch.logging.LogManager;
3332
import org.elasticsearch.logging.Logger;
34-
import org.elasticsearch.test.ESTestCase;
3533
import org.elasticsearch.test.rest.ESRestTestCase;
3634
import org.elasticsearch.xcontent.XContentType;
3735

@@ -45,7 +43,6 @@
4543
import java.util.List;
4644
import java.util.Map;
4745
import java.util.Set;
48-
import java.util.concurrent.Semaphore;
4946
import java.util.regex.Matcher;
5047
import java.util.regex.Pattern;
5148
import java.util.stream.Collectors;
@@ -58,7 +55,6 @@
5855
import static org.elasticsearch.xpack.esql.EsqlTestUtils.reader;
5956

6057
public class CsvTestsDataLoader {
61-
private static final int PARALLEL_THREADS = 10;
6258
private static final int BULK_DATA_SIZE = 100_000;
6359
private static final TestDataset EMPLOYEES = new TestDataset("employees", "mapping-default.json", "employees.csv").noSubfields();
6460
private static final TestDataset EMPLOYEES_INCOMPATIBLE = new TestDataset(
@@ -404,35 +400,16 @@ private static void loadDataSetIntoEs(
404400
IndexCreator indexCreator
405401
) throws IOException {
406402
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);
407-
List<TestDataset> datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled).stream()
408-
.toList();
409403

410-
executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel");
411-
412-
executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel");
413-
414-
forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger);
415-
416-
executeInParallel(
417-
ENRICH_POLICIES,
418-
policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger),
419-
"Failed to load enrich policies in parallel"
420-
);
421-
422-
}
423-
424-
private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) {
425-
Semaphore semaphore = new Semaphore(PARALLEL_THREADS);
426-
ESTestCase.runInParallel(items.size(), i -> {
427-
try {
428-
semaphore.acquire();
429-
consumer.accept(items.get(i));
430-
} catch (IOException | InterruptedException e) {
431-
throw new RuntimeException(errorMessage, e);
432-
} finally {
433-
semaphore.release();
434-
}
435-
});
404+
Set<String> loadedDatasets = new HashSet<>();
405+
for (var dataset : availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled)) {
406+
load(client, dataset, logger, indexCreator);
407+
loadedDatasets.add(dataset.indexName);
408+
}
409+
forceMerge(client, loadedDatasets, logger);
410+
for (var policy : ENRICH_POLICIES) {
411+
loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger);
412+
}
436413
}
437414

438415
public static void createInferenceEndpoints(RestClient client) throws IOException {
@@ -589,13 +566,11 @@ private static URL getResource(String name) {
589566
return result;
590567
}
591568

592-
private static void createIndex(RestClient client, TestDataset dataset, IndexCreator indexCreator) throws IOException {
569+
private static void load(RestClient client, TestDataset dataset, Logger logger, IndexCreator indexCreator) throws IOException {
593570
URL mapping = getResource("/" + dataset.mappingFileName);
594571
Settings indexSettings = dataset.readSettingsFile();
595572
indexCreator.createIndex(client, dataset.indexName, readMappingFile(mapping, dataset.typeMapping), indexSettings);
596-
}
597573

598-
private static void loadData(RestClient client, TestDataset dataset, Logger logger) throws IOException {
599574
// Some examples only test that the query and mappings are valid, and don't need example data. Use .noData() for those
600575
if (dataset.dataFileName != null) {
601576
URL data = getResource("/data/" + dataset.dataFileName);

x-pack/plugin/esql/qa/testFixtures/src/test/java/org/elasticsearch/xpack/esql/CsvTestsDataLoaderTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,12 @@
1111

1212
import java.net.ConnectException;
1313

14-
import static org.hamcrest.Matchers.instanceOf;
1514
import static org.hamcrest.Matchers.startsWith;
1615

1716
public class CsvTestsDataLoaderTests extends ESTestCase {
1817

1918
public void testCsvTestsDataLoaderExecution() {
20-
Throwable cause = expectThrows(AssertionError.class, () -> CsvTestsDataLoader.main(new String[] {}));
21-
// find the root cause
22-
while (cause.getCause() != null) {
23-
cause = cause.getCause();
24-
}
25-
assertThat(cause, instanceOf(ConnectException.class));
26-
assertThat(cause.getMessage(), startsWith("Connection refused"));
19+
ConnectException ce = expectThrows(ConnectException.class, () -> CsvTestsDataLoader.main(new String[] {}));
20+
assertThat(ce.getMessage(), startsWith("Connection refused"));
2721
}
2822
}

0 commit comments

Comments
 (0)