Skip to content

Conversation

@phananh1010
Copy link
Owner

Some recent test performance updates flushed out test fragility due to non-thread safety and other concurrency issues with test setup, and so two optimizations were reverted. A recent PR at elastic#136780 added improved resiliency to concurrency issues in test setup which should allow us to bring back the performance optimizations.

The resiliency fix included in elastic#136780 was originally extracted from the Views PR at elastic#134995, which, due to early issues in view creation/deleting during test setup was reliably exposing the same errors seen in elastic#134736, and so it seems likely it will also fix the issues seen in the optimizations.

Checklist:

Fixes elastic#134890

BASE=9770442d77ceac0dda6f6e6a07d0d386885ab52a
HEAD=955fbccc5328132bd5bcbb95bab60ca5b355e8ca
Branch=main
Copilot AI review requested due to automatic review settings November 7, 2025 22:03
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR improves data loading performance by introducing parallel execution for creating indices, loading data, and setting up enrich policies. It also updates test code to handle exceptions properly and adds concurrent request handling in multi-cluster tests.

Key changes:

  • Parallelized data loading operations using a semaphore-based approach with 10 concurrent threads
  • Refactored data loading to separate index creation from data ingestion
  • Added asynchronous request execution in multi-cluster tests using CompletableFuture

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
CsvTestsDataLoader.java Introduced parallel execution for index creation, data loading, and enrich policy setup; split monolithic load method into createIndex and loadData
CsvTestsDataLoaderTests.java Updated test to expect AssertionError wrapping ConnectException due to parallel execution framework
MultiClusterSpecIT.java Added runInParallel method to execute requests concurrently on local and remote clients using async APIs

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +460 to 472
private static <T> void executeInParallel(List<T> items, IOConsumer<T> consumer, String errorMessage) {
Semaphore semaphore = new Semaphore(PARALLEL_THREADS);
ESTestCase.runInParallel(items.size(), i -> {
try {
semaphore.acquire();
consumer.accept(items.get(i));
} catch (IOException | InterruptedException e) {
throw new RuntimeException(errorMessage, e);
} finally {
semaphore.release();
}
});
}
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semaphore acquisition is happening inside the parallel tasks, which defeats its purpose as a concurrency limiter. Since all tasks start simultaneously via runInParallel, they will all attempt to acquire the semaphore at once, causing most threads to block immediately.

The semaphore should limit the number of concurrently executing tasks, but the current implementation doesn't achieve this effectively because:

  1. All tasks are spawned upfront by runInParallel(items.size(), ...)
  2. Each task then tries to acquire the semaphore

A better approach would be to use a thread pool with a fixed size (e.g., ExecutorService with PARALLEL_THREADS threads) or restructure the code to submit tasks to the semaphore-controlled executor rather than spawning all threads first.

Copilot uses AI. Check for mistakes.
Response remote = remoteResponse.get();
Response local = localResponse.get();
assertEquals(remote.getStatusLine().getStatusCode(), local.getStatusLine().getStatusCode());
return local;
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runInParallel method doesn't handle InterruptedException properly when calling CompletableFuture.get(). If the thread is interrupted while waiting for the futures, the InterruptedException will be caught by the ExecutionException catch block and its cause will be thrown, losing the interrupt status.

Add a specific catch block for InterruptedException that restores the interrupt status before rethrowing:

} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    throw e;
} catch (ExecutionException e) {
    throw e.getCause();
}
Suggested change
return local;
return local;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;

Copilot uses AI. Check for mistakes.
@phananh1010 phananh1010 closed this Nov 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants