-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Fix Flink IllegalThreadStateException on Java 8 #36730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Flink IllegalThreadStateException on Java 8 #36730
Conversation
Summary of ChangesHello @aIbrahiim, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a known issue where Flink jobs running on Java 8 in attached mode could encounter an Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Assigning reviewers: R: @damccorm added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| } | ||
|
|
||
| /** Prevents ThreadGroup destruction while Flink cleanup threads are still running. */ | ||
| private void ensureFlinkCleanupComplete(Object executionEnv) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is an executionEnv but not used. What is the consideration here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh i added for any future checks like stream or batching, or local env and so on.
| private void ensureFlinkCleanupComplete(Object executionEnv) { | ||
| String javaVersion = System.getProperty("java.version"); | ||
| if (javaVersion == null || !javaVersion.startsWith("1.8")) { | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Busy wait is generally not desired but for band-aid fix I think it's fine. We should narrow the scenario that introduced 2 seconds latency
-
this only affects in-memory flink clusters, and also only affects batch jobs (':runners:flink:1.19:validatesRunnerBatch'.). Can we call wait only for batch?
-
Can we check whether FlinkPipelineOptions.getFlinkMaster is a "org.apache.flink.api.java.ExecutionEnvironment.LocalEnvironment" and only do the busy wait for local environment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I agree we can narrow the scope to wait for batch jobs and detect the environment and apply it for local env.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry realize this parameter is actually useful. We can do
if (executionEnv instanceof org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.LocalStreamEnvironment || executionEnv instanceof org.apache.flink.api.java.ExecutionEnvironment.LocalEnvironment)
this is more reliable than check getFlinkMaster below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the parameter
|
Is it okay that PreCommit Java PVR Flink Batch previously took ~2 min, and now it takes >1 hour? |
|
I don't see a difference here, the last run on this PR was 31 min: https://github.com/apache/beam/actions/runs/19112042171, a (non-cache) run on master is 30 min https://github.com/apache/beam/actions/runs/19089630209 The 1 min run are due to no change in code and skipped on gradle cache. |
|
I see, currently validatesPortableRunnerBatch is timing out. |
I guess the timeout related to some other issue as the validatesPortableRunnerBatch doesnt run on direct Flink runner |
|
got a successful run for PreCommit Java PVR Flink Batch on my runner in 37 minutes https://github.com/aIbrahiim/beam/actions/runs/19117606320 |
Fixes: #32949
Successful Run: https://github.com/aIbrahiim/beam/actions/runs/19104037201/job/54582948548
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.