-
Notifications
You must be signed in to change notification settings - Fork 40
Backport to branch(3) : Refactor ParallelExecutor #2824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @feeblefakie, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request is an automated backport focused on refactoring the ParallelExecutor class. The primary goal was to enhance the robustness and clarity of exception handling during both parallel and serial task execution, improve efficiency for single-task scenarios, and significantly expand test coverage to ensure the reliability of these changes.
Highlights
- Improved Exception Handling: Refactored the exception handling logic within
ParallelExecutorto more robustly collect and rethrow exceptions. Specifically,Throwable.addSuppressed()is now used to aggregate multiple exceptions whenstopOnErroris false, ensuring all failures are reported. A new private helper method,rethrow, was introduced to centralize the logic for rethrowing specific exception types. - Execution Optimization: Optimized the execution path for single tasks when
noWaitis false, allowing them to be run directly instead of being routed through the serial execution mechanism, which reduces overhead. - Enhanced Testability: The
executeTasksmethod's visibility was changed to@VisibleForTestingto facilitate more comprehensive unit testing of its internal logic and various execution scenarios. - Comprehensive Unit Tests: Added a suite of new unit tests for the
executeTasksmethod inParallelExecutorTest.java, covering parallel and serial execution, single-task optimization, and variousstopOnErrorbehaviors, including scenarios with multiple exceptions.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request refactors ParallelExecutor to improve exception handling and code structure. The core changes involve using Throwable.addSuppressed() to aggregate multiple exceptions when stopOnError is false, and introducing a rethrow() helper method to reduce code duplication. While these are excellent improvements, the refactoring has inadvertently removed critical exception logging from the serial execution paths (both for single-task optimization and the general serial case). Additionally, a new test case includes an unnecessary mock setup. My review comments focus on restoring the logging and cleaning up the test.
| // If there is only one task and noWait is false, we can run it directly without parallel | ||
| // execution. | ||
| executeTasksSerially(tasks, stopOnError, taskName, transactionId); | ||
| tasks.get(0).run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The direct execution of a single task (tasks.get(0).run()) on this line removes the logging of potential exceptions that was present in the previous executeTasksSerially call. This is a regression in observability and could hinder debugging. Consider wrapping this call in a try-catch block to restore the warning logging for exceptions like ExecutionException, ValidationConflictException, CrudException, and RuntimeException.
| executeTasksInParallel(tasks, noWait, stopOnError, taskName, transactionId); | ||
| } else { | ||
| executeTasksSerially(tasks, stopOnError, taskName, transactionId); | ||
| executeTasksSerially(tasks, stopOnError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The executeTasksSerially method is now called without taskName and transactionId. This change, combined with the modification in executeTasksSerially itself, removes the warning logging for failed tasks in serial execution. This is a regression in observability. The executeTasksSerially method should ideally retain these parameters to allow for proper logging.
| executeTasksSerially(tasks, stopOnError); | |
| executeTasksSerially(tasks, stopOnError, taskName, transactionId); |
| private void executeTasksSerially(List<ParallelExecutorTask> tasks, boolean stopOnError) | ||
| throws ExecutionException, ValidationConflictException, CrudException { | ||
| Exception exception = null; | ||
| for (ParallelExecutorTask task : tasks) { | ||
| try { | ||
| task.run(); | ||
| } catch (ExecutionException | ValidationConflictException | CrudException e) { | ||
| logger.warn("Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e); | ||
|
|
||
| if (!stopOnError) { | ||
| exception = e; | ||
| if (exception == null) { | ||
| exception = e; | ||
| } else { | ||
| exception.addSuppressed(e); | ||
| } | ||
| } else { | ||
| throw e; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (!stopOnError && exception != null) { | ||
| if (exception instanceof ExecutionException) { | ||
| throw (ExecutionException) exception; | ||
| } else if (exception instanceof ValidationConflictException) { | ||
| throw (ValidationConflictException) exception; | ||
| } else { | ||
| throw (CrudException) exception; | ||
| } | ||
| rethrow(exception); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This refactoring has removed the exception logging from executeTasksSerially. The previous implementation logged a warning when a task failed, which is important for observability and debugging. I recommend restoring this logging, including handling RuntimeException explicitly to make serial execution logging consistent with parallel execution.
private void executeTasksSerially(
List<ParallelExecutorTask> tasks, boolean stopOnError, String taskName, String transactionId)
throws ExecutionException, ValidationConflictException, CrudException {
Exception exception = null;
for (ParallelExecutorTask task : tasks) {
try {
task.run();
} catch (RuntimeException e) {
logger.warn("Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
throw e;
} catch (ExecutionException | ValidationConflictException | CrudException e) {
logger.warn("Failed to run a {} task. Transaction ID: {}", taskName, transactionId, e);
if (!stopOnError) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
} else {
throw e;
}
}
}
if (!stopOnError && exception != null) {
rethrow(exception);
}
}| public void executeTasks_SingleTaskAndNoWaitTrue_ShouldUseParallelExecution() | ||
| throws ExecutionException, ValidationConflictException, CrudException { | ||
| // Arrange | ||
| when(config.isParallelPreparationEnabled()).thenReturn(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mock setup for config.isParallelPreparationEnabled() is unnecessary because the executeTasks method doesn't use the config object for its parallelism behavior; it receives it via the parallel boolean parameter. Removing this line will make the test cleaner and easier to understand for future maintenance.
This is an automated backport of the following:
Please merge this PR after all checks have passed.