-
Notifications
You must be signed in to change notification settings - Fork 40
Improve ExecutorService lifecycle handling by moving shutdown to finally block #3166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Improve ExecutorService lifecycle handling by moving shutdown to finally block #3166
Conversation
Summary of ChangesHello @inv-jishnu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request correctly identifies the need to ensure the ExecutorService is always shut down and moves the shutdown logic to a finally block. This is a good improvement for resource safety. However, the change in its current form introduces a critical race condition by calling processFooter before awaitTermination. This could lead to corrupted output, especially for JSON files. I have left comments with suggestions to correct the order of operations, ensuring the footer is written only after all export tasks have completed.
| logger.error("Timeout occurred while waiting for tasks to complete"); | ||
| // TODO: handle this | ||
| } | ||
| processFooter(exportOptions, tableMetadata, bufferedWriter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call to processFooter introduces a race condition. It's now executed before the tasks in the ExecutorService are guaranteed to be complete. For formats like JSON, this can lead to a corrupted output file (e.g., a closing ] written before all objects). This line should be removed. The call should be moved into the finally block after awaitTermination succeeds, as I've suggested in another comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made changes based on feedback in e84bc49.
| try { | ||
| if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) { | ||
| logger.info("All tasks completed"); | ||
| } else { | ||
| logger.error("Timeout occurred while waiting for tasks to complete"); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| logger.error("Interrupted while waiting for executor termination", e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in my other comment, processFooter should be called here, after awaitTermination has successfully completed. This ensures all data has been written before the footer is added. The suggestion below moves the call here and also adds the necessary handling for IOException, which processFooter can throw.
try {
if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
logger.info("All tasks completed");
processFooter(exportOptions, tableMetadata, bufferedWriter);
} else {
logger.error("Timeout occurred while waiting for tasks to complete");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted while waiting for executor termination", e);
} catch (IOException e) {
logger.error("Error writing footer: ", e);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made changes based on feedback in e84bc49.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist @inv-jishnu I think there's still a potential race condition.
Even after awaitTermination() times out and Thread.currentThread().interrupt() is called, busy threads might be still running. If those threads are still executing when processFooter() is called, we could have data corruption.
How about guarding processFooter() with executorService.isTerminated() to ensure all threads have actually completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've raised a very important and valid point, @komamitsu. There is indeed a potential race condition in the current implementation regarding processFooter().
My initial suggestion (in comment #1) aimed to address this by placing processFooter() inside the if (executorService.awaitTermination(...)) block. This would ensure that the footer is only processed if all tasks completed successfully within the given timeout. If awaitTermination() returns true, it implies that executorService.isTerminated() would also be true at that point, meaning all tasks have indeed completed.
However, in the current code, processFooter() has been moved to be called unconditionally after the awaitTermination block (at lines 124-127 in the full patch). This reintroduces the race condition you've identified: if awaitTermination() times out (returns false) or an InterruptedException occurs, processFooter() will still be called while tasks might still be running or in an inconsistent state, potentially leading to data corruption.
To resolve this, processFooter() should be moved back into the if (executorService.awaitTermination(...)) block, similar to my original suggestion. This ensures it's only executed when all tasks are confirmed to have completed. The IOException handling for processFooter should also be included within that conditional block.
Since the processFooter call is now outside the lines highlighted by this comment (lines 112-121), I cannot provide a direct code suggestion here. However, the necessary change is to move the try-catch block for processFooter (currently at lines 124-127) to be immediately after logger.info("All tasks completed"); (line 115), and ensure its IOException is handled there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@komamitsu san,
I have moved processFooter section inside the if condition for the executorService.awaitTermination(...) as suggested in 27b5484.
Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@inv-jishnu Let me ask this just in case. Is it okay to always call processFooter() even when something fails (e.g., fetchDataChunk() throws an exception) ?
Description
This PR updates the startExport method to ensure proper and reliable shutdown of the ExecutorService used during data export. I have created this PR based on @komamitsu san's comment #3146 (comment) from another PR.
Previously, the executor was shut down only after the export loop completed successfully. If an exception occurred before reaching that point, the executor would never be shut down, potentially causing thread leaks and keeping the JVM alive longer than intended.
The updated implementation moves the executor shutdown and termination logic into a finally block, ensuring that the executor is shut down in all scenarios.
This fix improves resource safety, prevents thread leaks, and aligns the code with best practices for managing thread pools.
Related issues and/or PRs
NA
Changes made
Checklist
Additional notes (optional)
NA
Release notes
Improve ExecutorService lifecycle handling by moving shutdown to finally block