-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Fix Flink IllegalThreadStateException on Java 8 #36730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
3b9091d
57c07f6
1860c25
77c0bc6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,8 @@ | |
| import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import org.apache.beam.runners.core.metrics.MetricsPusher; | ||
| import org.apache.beam.sdk.Pipeline; | ||
| import org.apache.beam.sdk.PipelineResult; | ||
|
|
@@ -52,6 +54,8 @@ class FlinkPipelineExecutionEnvironment { | |
| private static final Logger LOG = | ||
| LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); | ||
|
|
||
| private static final Set<ThreadGroup> protectedThreadGroups = ConcurrentHashMap.newKeySet(); | ||
|
|
||
| private final FlinkPipelineOptions options; | ||
|
|
||
| /** | ||
|
|
@@ -143,6 +147,7 @@ public PipelineResult executePipeline() throws Exception { | |
| if (flinkBatchEnv != null) { | ||
| if (options.getAttachedMode()) { | ||
| JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName); | ||
| ensureFlinkCleanupComplete(flinkBatchEnv); | ||
| return createAttachedPipelineResult(jobExecutionResult); | ||
| } else { | ||
| JobClient jobClient = flinkBatchEnv.executeAsync(jobName); | ||
|
|
@@ -151,6 +156,7 @@ public PipelineResult executePipeline() throws Exception { | |
| } else if (flinkStreamEnv != null) { | ||
| if (options.getAttachedMode()) { | ||
| JobExecutionResult jobExecutionResult = flinkStreamEnv.execute(jobName); | ||
| ensureFlinkCleanupComplete(flinkStreamEnv); | ||
| return createAttachedPipelineResult(jobExecutionResult); | ||
| } else { | ||
| JobClient jobClient = flinkStreamEnv.executeAsync(jobName); | ||
|
|
@@ -161,6 +167,36 @@ public PipelineResult executePipeline() throws Exception { | |
| } | ||
| } | ||
|
|
||
| /** Prevents ThreadGroup destruction while Flink cleanup threads are still running. */ | ||
| private void ensureFlinkCleanupComplete(Object executionEnv) { | ||
| String javaVersion = System.getProperty("java.version"); | ||
| if (javaVersion == null || !javaVersion.startsWith("1.8")) { | ||
| return; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Busy wait is generally not desired but for band-aid fix I think it's fine. We should narrow the scenario that introduced 2 seconds latency
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I agree we can narrow the scope to wait for batch jobs and detect the environment and apply it for local env.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry realize this parameter is actually useful. We can do this is more reliable than check getFlinkMaster below
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added the parameter |
||
| } | ||
|
|
||
| ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup(); | ||
| if (currentThreadGroup == null) { | ||
| return; | ||
| } | ||
|
|
||
| protectedThreadGroups.add(currentThreadGroup); | ||
|
|
||
| Thread cleanupReleaser = | ||
| new Thread( | ||
| () -> { | ||
| try { | ||
| Thread.sleep(2000); // 2 seconds should be enough for Flink cleanup | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
| protectedThreadGroups.remove(currentThreadGroup); | ||
| } | ||
| }, | ||
| "FlinkCleanupReleaser"); | ||
| cleanupReleaser.setDaemon(true); | ||
| cleanupReleaser.start(); | ||
| } | ||
|
|
||
| private FlinkDetachedRunnerResult createDetachedPipelineResult( | ||
| JobClient jobClient, FlinkPipelineOptions options) { | ||
| LOG.info("Pipeline submitted in detached mode"); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is an executionEnv but not used. What is the consideration here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh i added for any future checks like stream or batching, or local env and so on.