From e9c5457eaedcb718ccd13e190404c85cb4d21598 Mon Sep 17 00:00:00 2001 From: lijie Date: Tue, 23 Dec 2025 19:13:36 +0800 Subject: [PATCH 1/6] [Feature][seatunnel core][seatunnel-flink-start] Filnk engine supports task restore from checkpoint upon startup(#10220) --- .../core/starter/flink/AbstractFlinkStarter.java | 7 +++++++ .../core/starter/flink/args/FlinkCommandArgs.java | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java index bd2e29f026d..3bd6c151015 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.core.starter.flink; +import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; + import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.constants.EngineType; import org.apache.seatunnel.core.starter.Starter; @@ -56,6 +58,11 @@ public List buildCommands() { command.add("${FLINK_HOME}/bin/flink"); // set deploy mode, run or run-application command.add(flinkCommandArgs.getDeployMode().getDeployMode()); + // set restore checkpoint + if (StringUtils.isNoneBlank(flinkCommandArgs.getFromSavepoint())) { + command.add("-s"); + command.add(flinkCommandArgs.getFromSavepoint()); + } // set submitted target master if (flinkCommandArgs.getMasterType() != null) { command.add("--target"); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java index ff098b9df97..ff177fe88da 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java @@ -53,6 +53,13 @@ public class FlinkCommandArgs extends AbstractCommandArgs { + "kubernetes-session, yarn-application, kubernetes-application]") private MasterType masterType; + /** restore checkpoint path */ + @Parameter( + names = {"-s", "--fromSavepoint"}, + description = + "Path to a savepoint to restore the job from (for example, flink run -s hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902).") + protected String fromSavepoint; + @Override public Command buildCommand() { Common.setDeployMode(getDeployMode()); @@ -75,6 +82,8 @@ public String toString() { + deployMode + ", masterType=" + masterType + + ", fromSavepoint=" + + fromSavepoint + ", configFile='" + configFile + '\'' From dcef5ecfda13923ad3d88eae2da27f1282253f36 Mon Sep 17 00:00:00 2001 From: lijie Date: Wed, 31 Dec 2025 13:51:03 +0800 Subject: [PATCH 2/6] [Feature][seatunnel core][seatunnel-flink-start] Filnk engine supports task restore from checkpoint upon startup --- .../core/starter/flink/AbstractFlinkStarter.java | 4 ++-- .../core/starter/flink/args/FlinkCommandArgs.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java index 3bd6c151015..34532ac126f 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java @@ -59,9 +59,9 @@ public List buildCommands() { // set deploy mode, run or run-application command.add(flinkCommandArgs.getDeployMode().getDeployMode()); // set restore checkpoint - if (StringUtils.isNoneBlank(flinkCommandArgs.getFromSavepoint())) { + if (StringUtils.isNoneBlank(flinkCommandArgs.getFromCheckpoint())) { command.add("-s"); - command.add(flinkCommandArgs.getFromSavepoint()); + command.add(flinkCommandArgs.getFromCheckpoint()); } // set submitted target master if (flinkCommandArgs.getMasterType() != null) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java index ff177fe88da..079b4aec4d9 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java @@ -55,10 +55,10 @@ public class FlinkCommandArgs extends AbstractCommandArgs { /** restore checkpoint path */ @Parameter( - names = {"-s", "--fromSavepoint"}, + names = {"-s", "--fromCheckpoint"}, description = - "Path to a savepoint to restore the job from (for example, flink run -s hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902).") - protected String fromSavepoint; + "Path to a checkpoint to restore the job from (for example, flink run -s hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902).") + protected String fromCheckpoint; @Override public Command buildCommand() { @@ -82,8 +82,8 @@ public String toString() { + deployMode + ", masterType=" + masterType - + ", fromSavepoint=" - + fromSavepoint + + ", fromCheckpoint=" + + fromCheckpoint + ", configFile='" + configFile + '\'' From 0b5bebcea80371940c678002fe2f5dd57a03635d Mon Sep 17 00:00:00 2001 From: lijie Date: Wed, 31 Dec 2025 14:07:34 +0800 Subject: [PATCH 3/6] [Feature][seatunnel core][seatunnel-flink-start] Filnk engine supports task restore from checkpoint upon startup --- docs/en/engines/flink.md | 8 ++++++++ docs/zh/engines/flink.md | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/docs/en/engines/flink.md b/docs/en/engines/flink.md index b6e7d6af77e..872464bd1e2 100644 --- a/docs/en/engines/flink.md +++ b/docs/en/engines/flink.md @@ -79,6 +79,14 @@ sink{ } ``` +### How to recover tasks from checkpoint + +When you need to recover a task from a checkpoint, simply specify the relevant checkpoint directory when starting the task. + +``` +bin/start-seatunnel-flink-13-connector-v2.sh --target local --fromCheckpoint hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902 --deploy-mode run --config ./config/example.conf +``` + ### How To Run A Job In A Project After you pull the code to the local, go to the `seatunnel-examples/seatunnel-flink-connector-v2-example` module and find `org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample` to complete the operation of the job. diff --git a/docs/zh/engines/flink.md b/docs/zh/engines/flink.md index 06f51a82b46..577f530f071 100644 --- a/docs/zh/engines/flink.md +++ b/docs/zh/engines/flink.md @@ -78,6 +78,14 @@ sink{ } ``` +### 如何从检查点恢复任务 + +当你需要从检查点恢复任务时,只需启动任务时指定相关的检查点目录即可。 + +``` +bin/start-seatunnel-flink-13-connector-v2.sh --target local --fromCheckpoint hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902 --deploy-mode run --config ./config/example.conf +``` + ### 如何在项目中运行Job 当你将代码拉到本地后,转到 `seatunnel-examples/seatunnel-flink-connector-v2-example` 模块,查找 `org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample` 即可完成job的操作。 From 9543092a895adf1a07f0944988fce44ae63ccd87 Mon Sep 17 00:00:00 2001 From: lijie Date: Tue, 20 Jan 2026 09:42:16 +0800 Subject: [PATCH 4/6] [Feature][seatunnel core][seatunnel-flink-start] Filnk engine supports task restore from checkpoint upon startup --- docs/en/engines/flink.md | 6 ++-- docs/zh/engines/flink.md | 6 ++-- .../seatunnel-flink-starter-common/pom.xml | 23 +++++++++++++ .../starter/flink/AbstractFlinkStarter.java | 4 +-- .../starter/flink/args/FlinkCommandArgs.java | 12 +++---- .../starter/flink/FlinkCommandArgsTest.java | 33 +++++++++++++++++++ .../starter/flink/TestFlinkParameter.java | 5 ++- .../flink/multitable/MultiTableSinkTest.java | 2 -- .../resources/config/fake_to_inmemory.json | 0 .../fake_to_inmemory_without_pluginname.json | 0 .../inmemory_to_inmemory_multi_table.conf | 0 .../resources/test_flink_run_parameter.conf | 0 12 files changed, 74 insertions(+), 17 deletions(-) rename seatunnel-core/seatunnel-flink-starter/{seatunnel-flink-15-starter => seatunnel-flink-starter-common}/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkCommandArgsTest.java (66%) rename seatunnel-core/seatunnel-flink-starter/{seatunnel-flink-15-starter => seatunnel-flink-starter-common}/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java (98%) rename seatunnel-core/seatunnel-flink-starter/{seatunnel-flink-15-starter => seatunnel-flink-starter-common}/src/test/resources/config/fake_to_inmemory.json (100%) rename seatunnel-core/seatunnel-flink-starter/{seatunnel-flink-15-starter => seatunnel-flink-starter-common}/src/test/resources/config/fake_to_inmemory_without_pluginname.json (100%) rename seatunnel-core/seatunnel-flink-starter/{seatunnel-flink-15-starter => seatunnel-flink-starter-common}/src/test/resources/config/inmemory_to_inmemory_multi_table.conf (100%) rename seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/{java => }/resources/test_flink_run_parameter.conf (100%) diff --git a/docs/en/engines/flink.md b/docs/en/engines/flink.md index 872464bd1e2..fef0366badd 100644 --- a/docs/en/engines/flink.md +++ b/docs/en/engines/flink.md @@ -79,12 +79,12 @@ sink{ } ``` -### How to recover tasks from checkpoint +### How to recover tasks from savepoint/checkpoint -When you need to recover a task from a checkpoint, simply specify the relevant checkpoint directory when starting the task. +When you need to recover a task from a savepoint/checkpoint, simply specify the relevant savepoint/checkpoint directory when starting the task(equivalent to `flink run -s`). ``` -bin/start-seatunnel-flink-13-connector-v2.sh --target local --fromCheckpoint hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902 --deploy-mode run --config ./config/example.conf +bin/start-seatunnel-flink-13-connector-v2.sh --target local --fromSavepoint hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902 --deploy-mode run --config ./config/example.conf ``` ### How To Run A Job In A Project diff --git a/docs/zh/engines/flink.md b/docs/zh/engines/flink.md index 577f530f071..ed7f23d464a 100644 --- a/docs/zh/engines/flink.md +++ b/docs/zh/engines/flink.md @@ -78,12 +78,12 @@ sink{ } ``` -### 如何从检查点恢复任务 +### 如何从保存点/检查点恢复任务 -当你需要从检查点恢复任务时,只需启动任务时指定相关的检查点目录即可。 +当你需要从保存点/检查点恢复任务时,只需启动任务时指定相关的保存点/检查点目录即可(与`flink run -s`功能相同)。 ``` -bin/start-seatunnel-flink-13-connector-v2.sh --target local --fromCheckpoint hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902 --deploy-mode run --config ./config/example.conf +bin/start-seatunnel-flink-13-connector-v2.sh --target local --fromSavepoint hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902 --deploy-mode run --config ./config/example.conf ``` ### 如何在项目中运行Job diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml index ee8481127f5..d38c7a54ec6 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml @@ -74,6 +74,29 @@ ${project.version} + + + org.apache.seatunnel + seatunnel-e2e-common + ${project.version} + test-jar + test + + + + org.apache.flink + flink-clients + ${flink.1.15.3.version} + test + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java index 34532ac126f..3bd6c151015 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/AbstractFlinkStarter.java @@ -59,9 +59,9 @@ public List buildCommands() { // set deploy mode, run or run-application command.add(flinkCommandArgs.getDeployMode().getDeployMode()); // set restore checkpoint - if (StringUtils.isNoneBlank(flinkCommandArgs.getFromCheckpoint())) { + if (StringUtils.isNoneBlank(flinkCommandArgs.getFromSavepoint())) { command.add("-s"); - command.add(flinkCommandArgs.getFromCheckpoint()); + command.add(flinkCommandArgs.getFromSavepoint()); } // set submitted target master if (flinkCommandArgs.getMasterType() != null) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java index 079b4aec4d9..4cabf31ce22 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java @@ -53,12 +53,12 @@ public class FlinkCommandArgs extends AbstractCommandArgs { + "kubernetes-session, yarn-application, kubernetes-application]") private MasterType masterType; - /** restore checkpoint path */ + /** restore savepoint/checkpoint path */ @Parameter( - names = {"-s", "--fromCheckpoint"}, + names = {"-s", "--fromSavepoint", "--fromCheckpoint"}, description = - "Path to a checkpoint to restore the job from (for example, flink run -s hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902).") - protected String fromCheckpoint; + "Path to a savepoint (or an externalized checkpoint) to restore the job from (equivalent to flink run -s ).") + protected String fromSavepoint; @Override public Command buildCommand() { @@ -82,8 +82,8 @@ public String toString() { + deployMode + ", masterType=" + masterType - + ", fromCheckpoint=" - + fromCheckpoint + + ", fromSavepoint=" + + fromSavepoint + ", configFile='" + configFile + '\'' diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkCommandArgsTest.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkCommandArgsTest.java similarity index 66% rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkCommandArgsTest.java rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkCommandArgsTest.java index 4b6330af408..cedbd70b54b 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkCommandArgsTest.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkCommandArgsTest.java @@ -28,6 +28,7 @@ import java.io.FileNotFoundException; import java.net.URISyntaxException; +import java.util.List; import static org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_NAME; @@ -63,4 +64,36 @@ private static FlinkCommandArgs buildFlinkCommandArgs(String configFile) { flinkCommandArgs.setVariables(null); return flinkCommandArgs; } + + @Test + public void testBuildFlinkCommandArgsWithSavePoint() { + FlinkStarter flinkStarter = + new FlinkStarter( + new String[] { + "--target", "local", + "--deploy-mode ", "run", + "--config ", "/config/fake_to_inmemory1.json", + "--fromCheckpoint ", + "hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902" + }); + List commands = flinkStarter.buildCommands(); + Assertions.assertTrue( + commands.contains("-s") || commands.contains("--fromSavepoint"), + "The flink commands should include either `-s` or `--fromSavepoint`"); + } + + @Test + public void testBuildFlinkCommandArgsWithoutSavePoint() { + FlinkStarter flinkStarter = + new FlinkStarter( + new String[] { + "--target", "local", + "--deploy-mode ", "run", + "--config ", "/config/fake_to_inmemory1.json", + }); + List commands = flinkStarter.buildCommands(); + Assertions.assertTrue( + !commands.contains("-s") && !commands.contains("--fromSavepoint"), + "The flink commands should not include either `-s` or `--fromSavepoint`"); + } } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/TestFlinkParameter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/TestFlinkParameter.java index 6a30b86a8ed..75c9dcd8bac 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/TestFlinkParameter.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/TestFlinkParameter.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs; +import org.apache.seatunnel.core.starter.flink.multitable.MultiTableSinkTest; import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil; import org.apache.seatunnel.core.starter.utils.ConfigBuilder; import org.apache.seatunnel.core.starter.utils.FileUtils; @@ -51,7 +52,9 @@ public void testFlinkParameter() throws Exception { flinkCommandArgs.setEncrypt(false); flinkCommandArgs.setDecrypt(false); flinkCommandArgs.setHelp(false); - flinkCommandArgs.setConfigFile("src/test/java/resources/test_flink_run_parameter.conf"); + + flinkCommandArgs.setConfigFile( + MultiTableSinkTest.getTestConfigFile("/test_flink_run_parameter.conf")); flinkCommandArgs.setVariables(null); Path configFile = FileUtils.getConfigPath(flinkCommandArgs); Config config = ConfigBuilder.of(configFile).getConfig("env"); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java similarity index 98% rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java index 2fcf40d4604..403a359b4cf 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java @@ -25,7 +25,6 @@ import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import java.io.FileNotFoundException; @@ -36,7 +35,6 @@ import java.util.Collections; import java.util.List; -@Order(1) public class MultiTableSinkTest { @Test diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory.json b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/resources/config/fake_to_inmemory.json similarity index 100% rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory.json rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/resources/config/fake_to_inmemory.json diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_without_pluginname.json b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/resources/config/fake_to_inmemory_without_pluginname.json similarity index 100% rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_without_pluginname.json rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/resources/config/fake_to_inmemory_without_pluginname.json diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/resources/config/inmemory_to_inmemory_multi_table.conf similarity index 100% rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/resources/config/inmemory_to_inmemory_multi_table.conf diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/resources/test_flink_run_parameter.conf b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/resources/test_flink_run_parameter.conf similarity index 100% rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/resources/test_flink_run_parameter.conf rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/resources/test_flink_run_parameter.conf From 5c481539f715a8b78c8ad10b0b0fc747ed0b8b84 Mon Sep 17 00:00:00 2001 From: lijie Date: Wed, 28 Jan 2026 09:23:59 +0800 Subject: [PATCH 5/6] [Feature][seatunnel core][seatunnel-flink-start] Filnk engine supports task restore from checkpoint upon startup --- docs/en/engines/command/usage.mdx | 48 +++++++++++++++++-------------- docs/zh/engines/command/usage.mdx | 46 +++++++++++++++-------------- 2 files changed, 50 insertions(+), 44 deletions(-) diff --git a/docs/en/engines/command/usage.mdx b/docs/en/engines/command/usage.mdx index e3d82519cb5..2bfd49151af 100644 --- a/docs/en/engines/command/usage.mdx +++ b/docs/en/engines/command/usage.mdx @@ -99,17 +99,19 @@ Usage: start-seatunnel-spark-3-connector-v2.sh [options] ```bash Usage: start-seatunnel-flink-13-connector-v2.sh [options] Options: - --check Whether check config (default: false) - -c, --config Config file - -e, --deploy-mode Flink job deploy mode, support [run, run-application] - (default: run) - -h, --help Show the usage message - --master, --target Flink job submitted target master, support [local, - remote, yarn-session, yarn-per-job, kubernetes-session, - yarn-application, kubernetes-application] - -n, --name SeaTunnel job name (default: SeaTunnel) - -i, --variable Variable substitution, such as -i city=beijing, or -i - date=20190318 (default: []) + --check Whether check config (default: false) + -c, --config Config file + -e, --deploy-mode Flink job deploy mode, support [run, run-application] + (default: run) + -h, --help Show the usage message + --master, --target Flink job submitted target master, support [local, + remote, yarn-session, yarn-per-job, kubernetes-session, + yarn-application, kubernetes-application] + -s,--fromSavepoint ,--fromCheckpoint Path to a savepoint (or an externalized checkpoint) to + restore the job from (equivalent to "flink run -s" ). + -n, --name SeaTunnel job name (default: SeaTunnel) + -i, --variable Variable substitution, such as -i city=beijing, or -i + date=20190318 (default: []) ``` @@ -118,17 +120,19 @@ Usage: start-seatunnel-flink-13-connector-v2.sh [options] ```bash Usage: start-seatunnel-flink-15-connector-v2.sh [options] Options: - --check Whether check config (default: false) - -c, --config Config file - -e, --deploy-mode Flink job deploy mode, support [run, run-application] - (default: run) - -h, --help Show the usage message - --master, --target Flink job submitted target master, support [local, - remote, yarn-session, yarn-per-job, kubernetes-session, - yarn-application, kubernetes-application] - -n, --name SeaTunnel job name (default: SeaTunnel) - -i, --variable Variable substitution, such as -i city=beijing, or -i - date=20190318 (default: []) + --check Whether check config (default: false) + -c, --config Config file + -e, --deploy-mode Flink job deploy mode, support [run, run-application] + (default: run) + -h, --help Show the usage message + --master, --target Flink job submitted target master, support [local, + remote, yarn-session, yarn-per-job, kubernetes-session, + yarn-application, kubernetes-application] + -s,--fromSavepoint ,--fromCheckpoint Path to a savepoint (or an externalized checkpoint) to + restore the job from (equivalent to "flink run -s" ). + -n, --name SeaTunnel job name (default: SeaTunnel) + -i, --variable Variable substitution, such as -i city=beijing, or -i + date=20190318 (default: []) ``` diff --git a/docs/zh/engines/command/usage.mdx b/docs/zh/engines/command/usage.mdx index a667a425fbb..4594da7e62d 100644 --- a/docs/zh/engines/command/usage.mdx +++ b/docs/zh/engines/command/usage.mdx @@ -99,17 +99,18 @@ bin/start-seatunnel-flink-15-connector-v2.sh ```bash 用法: start-seatunnel-flink-13-connector-v2.sh [选项] 选项: - --check 是否检查配置 (默认: false) - -c, --config 配置文件 - -e, --deploy-mode Flink 作业部署模式,支持 [run, run-application] - (默认: run) - -h, --help 显示使用说明 - --master, --target Flink 作业提交目标 master,支持 [local, - remote, yarn-session, yarn-per-job, kubernetes-session, - yarn-application, kubernetes-application] - -n, --name SeaTunnel 作业名称 (默认: SeaTunnel) - -i, --variable 变量替换,例如 -i city=beijing,或 -i - date=20190318 (默认: []) + --check 是否检查配置 (默认: false) + -c, --config 配置文件 + -e, --deploy-mode Flink 作业部署模式,支持 [run, run-application] + (默认: run) + -h, --help 显示使用说明 + --master, --target Flink 作业提交目标 master,支持 [local, + remote, yarn-session, yarn-per-job, kubernetes-session, + yarn-application, kubernetes-application] + -s,--fromSavepoint ,--fromCheckpoint 从保存点(或检查点)恢复作业(相当于"flink run -s") + -n, --name SeaTunnel 作业名称 (默认: SeaTunnel) + -i, --variable 变量替换,例如 -i city=beijing,或 -i + date=20190318 (默认: []) ``` @@ -118,17 +119,18 @@ bin/start-seatunnel-flink-15-connector-v2.sh ```bash 用法: start-seatunnel-flink-15-connector-v2.sh [选项] 选项: - --check 是否检查配置 (默认: false) - -c, --config 配置文件 - -e, --deploy-mode Flink 作业部署模式,支持 [run, run-application] - (默认: run) - -h, --help 显示使用说明 - --master, --target Flink 作业提交目标 master,支持 [local, - remote, yarn-session, yarn-per-job, kubernetes-session, - yarn-application, kubernetes-application] - -n, --name SeaTunnel 作业名称 (默认: SeaTunnel) - -i, --variable 变量替换,例如 -i city=beijing,或 -i - date=20190318 (默认: []) + --check 是否检查配置 (默认: false) + -c, --config 配置文件 + -e, --deploy-mode Flink 作业部署模式,支持 [run, run-application] + (默认: run) + -h, --help 显示使用说明 + --master, --target Flink 作业提交目标 master,支持 [local, + remote, yarn-session, yarn-per-job, kubernetes-session, + yarn-application, kubernetes-application] + -s,--fromSavepoint ,--fromCheckpoint 从保存点(或检查点)恢复作业(相当于"flink run -s") + -n, --name SeaTunnel 作业名称 (默认: SeaTunnel) + -i, --variable 变量替换,例如 -i city=beijing,或 -i + date=20190318 (默认: []) ``` From bde996d64eabc1d1e6cfd10a04fca4bc2f313729 Mon Sep 17 00:00:00 2001 From: lijie Date: Wed, 28 Jan 2026 09:38:59 +0800 Subject: [PATCH 6/6] [Feature][seatunnel core][seatunnel-flink-start] Filnk engine supports task restore from checkpoint upon startup --- .../core/starter/flink/multitable/MultiTableSinkTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java index 403a359b4cf..2fcf40d4604 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import java.io.FileNotFoundException; @@ -35,6 +36,7 @@ import java.util.Collections; import java.util.List; +@Order(1) public class MultiTableSinkTest { @Test