Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand All @@ -29,8 +31,10 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
Expand All @@ -52,6 +56,8 @@ class FlinkPipelineExecutionEnvironment {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);

private static final Set<ThreadGroup> protectedThreadGroups = ConcurrentHashMap.newKeySet();

private final FlinkPipelineOptions options;

/**
Expand Down Expand Up @@ -143,6 +149,7 @@ public PipelineResult executePipeline() throws Exception {
if (flinkBatchEnv != null) {
if (options.getAttachedMode()) {
JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName);
ensureFlinkCleanupComplete(flinkBatchEnv);
return createAttachedPipelineResult(jobExecutionResult);
} else {
JobClient jobClient = flinkBatchEnv.executeAsync(jobName);
Expand All @@ -151,6 +158,7 @@ public PipelineResult executePipeline() throws Exception {
} else if (flinkStreamEnv != null) {
if (options.getAttachedMode()) {
JobExecutionResult jobExecutionResult = flinkStreamEnv.execute(jobName);
ensureFlinkCleanupComplete(flinkStreamEnv);
return createAttachedPipelineResult(jobExecutionResult);
} else {
JobClient jobClient = flinkStreamEnv.executeAsync(jobName);
Expand All @@ -161,6 +169,41 @@ public PipelineResult executePipeline() throws Exception {
}
}

/** Prevents ThreadGroup destruction while Flink cleanup threads are still running. */
private void ensureFlinkCleanupComplete(Object executionEnv) {
String javaVersion = System.getProperty("java.version");
if (javaVersion == null || !javaVersion.startsWith("1.8")) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Busy wait is generally not desired but for band-aid fix I think it's fine. We should narrow the scenario that introduced 2 seconds latency

  • this only affects in-memory flink clusters, and also only affects batch jobs (':runners:flink:1.19:validatesRunnerBatch'.). Can we call wait only for batch?

  • Can we check whether FlinkPipelineOptions.getFlinkMaster is a "org.apache.flink.api.java.ExecutionEnvironment.LocalEnvironment" and only do the busy wait for local environment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I agree we can narrow the scope to wait for batch jobs and detect the environment and apply it for local env.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry realize this parameter is actually useful. We can do

if (executionEnv instanceof org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.LocalStreamEnvironment || executionEnv instanceof org.apache.flink.api.java.ExecutionEnvironment.LocalEnvironment)

this is more reliable than check getFlinkMaster below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the parameter

}

if (!(executionEnv instanceof LocalStreamEnvironment
|| executionEnv instanceof LocalEnvironment)) {
return;
}

ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup();
if (currentThreadGroup == null) {
return;
}

protectedThreadGroups.add(currentThreadGroup);

Thread cleanupReleaser =
new Thread(
() -> {
try {
Thread.sleep(2000); // 2 seconds should be enough for Flink cleanup
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
protectedThreadGroups.remove(currentThreadGroup);
}
},
"FlinkCleanupReleaser");
cleanupReleaser.setDaemon(true);
cleanupReleaser.start();
}

private FlinkDetachedRunnerResult createDetachedPipelineResult(
JobClient jobClient, FlinkPipelineOptions options) {
LOG.info("Pipeline submitted in detached mode");
Expand Down
Loading