diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendITCase.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendITCase.java index daefe430d55c9..4995a88532f5c 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendITCase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendITCase.java @@ -21,6 +21,8 @@ import org.apache.flink.client.deployment.executors.LocalExecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.core.execution.RecoveryClaimMode; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -83,6 +85,23 @@ void configurationIsForwarded() throws Exception { assertThat(getStdoutString()).contains("Watermark interval is 42"); } + @Test + void configurationRestoreMode() throws Exception { + Configuration config = new Configuration(); + CustomCommandLine commandLine = new DefaultCLI(); + + config.set(StateRecoveryOptions.RESTORE_MODE, RecoveryClaimMode.CLAIM); + + CliFrontend cliFrontend = new CliFrontend(config, Collections.singletonList(commandLine)); + + cliFrontend.parseAndRun( + new String[] { + "run", "-c", TestingJob.class.getName(), CliFrontendTestUtils.getTestJarPath() + }); + + assertThat(getStdoutString()).contains("Restore mode is CLAIM"); + } + @Test void commandlineOverridesConfiguration() throws Exception { Configuration config = new Configuration(); @@ -138,6 +157,9 @@ public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); System.out.println( "Watermark interval is " + env.getConfig().getAutoWatermarkInterval()); + System.out.println( + "Restore mode is " + + env.getConfiguration().get(StateRecoveryOptions.RESTORE_MODE)); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java index e7bdb2fdeadfd..62de1fd61150b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java @@ -176,14 +176,17 @@ public static SavepointRestoreSettings forPath( public static void toConfiguration( final SavepointRestoreSettings savepointRestoreSettings, final Configuration configuration) { - configuration.set( - StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, - savepointRestoreSettings.allowNonRestoredState()); - configuration.set( - StateRecoveryOptions.RESTORE_MODE, savepointRestoreSettings.getRecoveryClaimMode()); - final String savepointPath = savepointRestoreSettings.getRestorePath(); - if (savepointPath != null) { - configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath); + if (!savepointRestoreSettings.equals(SavepointRestoreSettings.none())) { + configuration.set( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, + savepointRestoreSettings.allowNonRestoredState()); + configuration.set( + StateRecoveryOptions.RESTORE_MODE, + savepointRestoreSettings.getRecoveryClaimMode()); + final String savepointPath = savepointRestoreSettings.getRestorePath(); + if (savepointPath != null) { + configuration.set(StateRecoveryOptions.SAVEPOINT_PATH, savepointPath); + } } }