Skip to content

Commit 5214c3d

Browse files
Refactor to use ESTestCase.runInParallel()
1 parent 8d30e9f commit 5214c3d

File tree

1 file changed

+24
-47
lines changed

1 file changed

+24
-47
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.apache.http.impl.client.BasicCredentialsProvider;
1919
import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
2020
import org.apache.lucene.util.IOConsumer;
21-
import org.elasticsearch.ExceptionsHelper;
2221
import org.elasticsearch.client.Request;
2322
import org.elasticsearch.client.Response;
2423
import org.elasticsearch.client.ResponseException;
@@ -32,6 +31,7 @@
3231
import org.elasticsearch.inference.TaskType;
3332
import org.elasticsearch.logging.LogManager;
3433
import org.elasticsearch.logging.Logger;
34+
import org.elasticsearch.test.ESTestCase;
3535
import org.elasticsearch.test.rest.ESRestTestCase;
3636
import org.elasticsearch.xcontent.XContentType;
3737

@@ -45,9 +45,7 @@
4545
import java.util.List;
4646
import java.util.Map;
4747
import java.util.Set;
48-
import java.util.concurrent.ExecutorService;
49-
import java.util.concurrent.Executors;
50-
import java.util.concurrent.Future;
48+
import java.util.concurrent.Semaphore;
5149
import java.util.stream.Collectors;
5250
import java.util.stream.Stream;
5351

@@ -404,56 +402,35 @@ private static void loadDataSetIntoEs(
404402
IndexCreator indexCreator
405403
) throws IOException {
406404
Logger logger = LogManager.getLogger(CsvTestsDataLoader.class);
407-
Set<TestDataset> datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled);
408-
ExecutorService executor = Executors.newFixedThreadPool(PARALLEL_THREADS);
409-
try {
410-
executeInParallel(
411-
executor,
412-
datasets,
413-
dataset -> createIndex(client, dataset, indexCreator),
414-
"Failed to create indices in parallel"
415-
);
405+
List<TestDataset> datasets = availableDatasetsForEs(supportsIndexModeLookup, supportsSourceFieldMapping, inferenceEnabled).stream()
406+
.toList();
416407

417-
executeInParallel(executor, datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel");
408+
executeInParallel(datasets, dataset -> createIndex(client, dataset, indexCreator), "Failed to create indices in parallel");
418409

419-
forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger);
410+
executeInParallel(datasets, dataset -> loadData(client, dataset, logger), "Failed to load data in parallel");
420411

421-
executeInParallel(
422-
executor,
423-
ENRICH_POLICIES,
424-
policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger),
425-
"Failed to load enrich policies in parallel"
426-
);
427-
} finally {
428-
executor.shutdown();
429-
}
430-
}
412+
forceMerge(client, datasets.stream().map(d -> d.indexName).collect(Collectors.toSet()), logger);
431413

432-
private static <T> void executeInParallel(ExecutorService executor, Iterable<T> items, IOConsumer<T> consumer, String errorMessage)
433-
throws IOException {
434-
List<Future<?>> futures = new ArrayList<>();
435-
for (T item : items) {
436-
futures.add(executor.submit(() -> {
437-
try {
438-
consumer.accept(item);
439-
} catch (IOException e) {
440-
throw new RuntimeException(e);
441-
}
442-
}));
443-
}
414+
executeInParallel(
415+
ENRICH_POLICIES,
416+
policy -> loadEnrichPolicy(client, policy.policyName, policy.policyFileName, logger),
417+
"Failed to load enrich policies in parallel"
418+
);
419+
420+
}
444421

445-
RuntimeException exception = null;
446-
for (Future<?> future : futures) {
422+
private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) {
423+
Semaphore semaphore = new Semaphore(PARALLEL_THREADS);
424+
ESTestCase.runInParallel(items.size(), i -> {
447425
try {
448-
future.get();
449-
} catch (Exception e) {
450-
exception = ExceptionsHelper.useOrSuppress(exception, ExceptionsHelper.convertToRuntime(e));
426+
semaphore.acquire();
427+
consumer.accept(items.get(i));
428+
} catch (IOException | InterruptedException e) {
429+
throw new RuntimeException(errorMessage, e);
430+
} finally {
431+
semaphore.release();
451432
}
452-
}
453-
454-
if (exception != null) {
455-
throw new IOException(errorMessage, exception);
456-
}
433+
});
457434
}
458435

459436
public static void createInferenceEndpoints(RestClient client) throws IOException {

0 commit comments

Comments
 (0)