Skip to content

Commit 2fcaa46

Browse files
committed
[Improvement-17908][Flink] Make -sae parameter configurable with default enabled
Changes: - Make shutdownOnAttachedExit field configurable via UI switch - Change default behavior from disabled to enabled (null/true -> add -sae) - When explicitly disabled (false), no parameter is added, relying on Flink's default behavior - Update FlinkArgsUtils logic from Boolean.TRUE.equals() to !Boolean.FALSE.equals() - Add comprehensive test coverage for all scenarios (null, true, false) - Update JavaDoc to reflect new default behavior - Add UI switch control positioned after Yarn Queue field - Add Chinese and English i18n translations This change prevents resource leakage and duplicate tasks during worker failover by enabling cluster shutdown when CLI terminates abruptly (default behavior). The implementation uses a simple approach: - Enabled (default): add -sae parameter - Disabled: don't add any parameter (rely on Flink default)
1 parent fb9c968 commit 2fcaa46

File tree

5 files changed

+41
-28
lines changed

5 files changed

+41
-28
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ public void testRunJarInApplicationMode() throws Exception {
6868
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
6969
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
7070

71-
// APPLICATION mode should NOT include -sae parameter (detached mode on YARN)
71+
// Default: shutdownOnAttachedExit is null (true), should include -sae
7272
Assertions.assertEquals(
73-
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
73+
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
7474
joinStringListWithSpace(commandLine));
7575
}
7676

@@ -81,25 +81,25 @@ public void testRunJarInClusterMode() throws Exception {
8181
List<String> commandLine1 =
8282
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
8383

84-
// Default: shutdownOnAttachedExit is null/false, should NOT include -sae
84+
// Default: shutdownOnAttachedExit is null (true), should include -sae
8585
Assertions.assertEquals(
86-
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
86+
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
8787
joinStringListWithSpace(commandLine1));
8888

8989
flinkParameters.setFlinkVersion("<1.10");
9090
List<String> commandLine2 =
9191
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
9292

9393
Assertions.assertEquals(
94-
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
94+
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
9595
joinStringListWithSpace(commandLine2));
9696

9797
flinkParameters.setFlinkVersion(">=1.12");
9898
List<String> commandLine3 =
9999
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
100100

101101
Assertions.assertEquals(
102-
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
102+
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
103103
joinStringListWithSpace(commandLine3));
104104
}
105105

@@ -108,9 +108,22 @@ public void testRunJarInLocalMode() throws Exception {
108108
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
109109
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
110110

111-
// Default: shutdownOnAttachedExit is null/false, should NOT include -sae
111+
// Default: shutdownOnAttachedExit is null (true), should include -sae
112112
Assertions.assertEquals(
113-
"${FLINK_HOME}/bin/flink run -p 4 -c org.example.Main /opt/job.jar",
113+
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
114+
joinStringListWithSpace(commandLine));
115+
}
116+
117+
@Test
118+
public void testRunJarWithShutdownOnAttachedExitDisabled() throws Exception {
119+
FlinkStreamParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.CLUSTER);
120+
flinkParameters.setShutdownOnAttachedExit(false); // Explicitly disable
121+
flinkParameters.setFlinkVersion(">=1.12");
122+
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
123+
124+
// When explicitly disabled, should NOT include -sae parameter (rely on Flink default)
125+
Assertions.assertEquals(
126+
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
114127
joinStringListWithSpace(commandLine));
115128
}
116129

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ private static List<String> buildRunCommandLineForOthers(TaskExecutionContext ta
265265
}
266266

267267
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated
268-
// abruptly
268+
// abruptly. This prevents resource leakage and duplicate tasks during worker failover.
269269
// The task status will be synchronized with the cluster job status
270-
if (Boolean.TRUE.equals(flinkParameters.getShutdownOnAttachedExit())) {
270+
if (!Boolean.FALSE.equals(flinkParameters.getShutdownOnAttachedExit())) {
271271
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
272272
}
273273

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,14 @@ public class FlinkParameters extends AbstractParameters {
117117
/**
118118
* Shutdown on attached exit (-sae parameter)
119119
*
120-
* <p>When enabled, Flink CLI will attempt to shutdown the cluster when the CLI
121-
* terminates abruptly. This is only suitable for attached mode (CLUSTER/LOCAL).
120+
* <p>When enabled (default), Flink CLI will attempt to shutdown the cluster when the CLI
121+
* terminates abruptly. This helps prevent resource leakage and duplicate tasks
122+
* during worker failover scenarios.
122123
*
123-
* <p>For APPLICATION mode, this should typically be disabled as the job runs
124-
* in detached mode on YARN.
124+
* <p>When disabled, no parameter is added and we rely on Flink's default behavior
125+
* (which is shutdown-on-attached-exit=false).
125126
*
126-
* <p>Default: false (disabled for safety)
127+
* <p>Default: true (enabled to prevent resource leakage during failover)
127128
*
128129
* @see FlinkArgsUtils#buildRunCommandLine
129130
*/

dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ public void testRunJarInApplicationMode() throws Exception {
6868
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.APPLICATION);
6969
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
7070

71-
// APPLICATION mode should NOT include -sae parameter (detached mode on YARN)
71+
// Default: shutdownOnAttachedExit is null (true), should include -sae
7272
Assertions.assertEquals(
73-
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
73+
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
7474
joinStringListWithSpace(commandLine));
7575
}
7676

@@ -81,25 +81,25 @@ public void testRunJarInClusterMode() {
8181
List<String> commandLine1 =
8282
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
8383

84-
// Default: shutdownOnAttachedExit is null/false, should NOT include -sae
84+
// Default: shutdownOnAttachedExit is null (true), should include -sae
8585
Assertions.assertEquals(
86-
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
86+
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
8787
joinStringListWithSpace(commandLine1));
8888

8989
flinkParameters.setFlinkVersion("<1.10");
9090
List<String> commandLine2 =
9191
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
9292

9393
Assertions.assertEquals(
94-
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
94+
"${FLINK_HOME}/bin/flink run -m yarn-cluster -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
9595
joinStringListWithSpace(commandLine2));
9696

9797
flinkParameters.setFlinkVersion(">=1.12");
9898
List<String> commandLine3 =
9999
FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
100100

101101
Assertions.assertEquals(
102-
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
102+
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
103103
joinStringListWithSpace(commandLine3));
104104
}
105105

@@ -108,9 +108,9 @@ public void testRunJarInLocalMode() {
108108
FlinkParameters flinkParameters = buildTestFlinkParametersWithDeployMode(FlinkDeployMode.LOCAL);
109109
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
110110

111-
// Default: shutdownOnAttachedExit is null/false, should NOT include -sae
111+
// Default: shutdownOnAttachedExit is null (true), should include -sae
112112
Assertions.assertEquals(
113-
"${FLINK_HOME}/bin/flink run -p 4 -c org.example.Main /opt/job.jar",
113+
"${FLINK_HOME}/bin/flink run -p 4 -sae -c org.example.Main /opt/job.jar",
114114
joinStringListWithSpace(commandLine));
115115
}
116116

@@ -121,7 +121,7 @@ public void testRunJarWithShutdownOnAttachedExitEnabled() {
121121
flinkParameters.setFlinkVersion(">=1.12");
122122
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
123123

124-
// When explicitly enabled, should include -sae parameter
124+
// When explicitly set to true (same as default), should include -sae parameter
125125
Assertions.assertEquals(
126126
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
127127
joinStringListWithSpace(commandLine));
@@ -134,7 +134,7 @@ public void testRunJarWithShutdownOnAttachedExitDisabled() {
134134
flinkParameters.setFlinkVersion(">=1.12");
135135
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
136136

137-
// When explicitly disabled, should NOT include -sae parameter
137+
// When explicitly disabled, should NOT include -sae parameter (rely on Flink default)
138138
Assertions.assertEquals(
139139
"${FLINK_HOME}/bin/flink run -t yarn-per-job -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -c org.example.Main /opt/job.jar",
140140
joinStringListWithSpace(commandLine));
@@ -146,8 +146,7 @@ public void testRunJarWithShutdownOnAttachedExitInApplicationMode() {
146146
flinkParameters.setShutdownOnAttachedExit(true); // Even if enabled
147147
List<String> commandLine = FlinkArgsUtils.buildRunCommandLine(buildTestTaskExecutionContext(), flinkParameters);
148148

149-
// APPLICATION mode with shutdownOnAttachedExit=true should include -sae
150-
// (User explicitly wants it, even though it may cause issues)
149+
// APPLICATION mode: when enabled, should include -sae (same as other modes)
151150
Assertions.assertEquals(
152151
"${FLINK_HOME}/bin/flink run-application -t yarn-application -ys 4 -ynm demo-app-name -yjm 1024m -ytm 1024m -p 4 -sae -c org.example.Main /opt/job.jar",
153152
joinStringListWithSpace(commandLine));

dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
292292
'checked-value': true,
293293
'unchecked-value': false
294294
},
295-
value: false
295+
value: true
296296
},
297297
{
298298
type: 'input',

0 commit comments

Comments
 (0)