Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions docs/en/command/usage.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,19 @@ Usage: start-seatunnel-spark-3-connector-v2.sh [options]
```bash
Usage: start-seatunnel-flink-13-connector-v2.sh [options]
Options:
--check Whether check config (default: false)
-c, --config Config file
-e, --deploy-mode Flink job deploy mode, support [run, run-application]
(default: run)
-h, --help Show the usage message
--master, --target Flink job submitted target master, support [local,
remote, yarn-session, yarn-per-job, kubernetes-session,
yarn-application, kubernetes-application]
-n, --name SeaTunnel job name (default: SeaTunnel)
-i, --variable Variable substitution, such as -i city=beijing, or -i
date=20190318 (default: [])
--check Whether check config (default: false)
-c, --config Config file
-e, --deploy-mode Flink job deploy mode, support [run, run-application]
(default: run)
-h, --help Show the usage message
--master, --target Flink job submitted target master, support [local,
remote, yarn-session, yarn-per-job, kubernetes-session,
yarn-application, kubernetes-application]
-s,--fromSavepoint <savepointPath>,--fromCheckpoint <checkpointPath> Path to a savepoint (or an externalized checkpoint) to
restore the job from (equivalent to "flink run -s" ).
-n, --name SeaTunnel job name (default: SeaTunnel)
-i, --variable Variable substitution, such as -i city=beijing, or -i
date=20190318 (default: [])
```

</TabItem>
Expand All @@ -118,17 +120,19 @@ Usage: start-seatunnel-flink-13-connector-v2.sh [options]
```bash
Usage: start-seatunnel-flink-15-connector-v2.sh [options]
Options:
--check Whether check config (default: false)
-c, --config Config file
-e, --deploy-mode Flink job deploy mode, support [run, run-application]
(default: run)
-h, --help Show the usage message
--master, --target Flink job submitted target master, support [local,
remote, yarn-session, yarn-per-job, kubernetes-session,
yarn-application, kubernetes-application]
-n, --name SeaTunnel job name (default: SeaTunnel)
-i, --variable Variable substitution, such as -i city=beijing, or -i
date=20190318 (default: [])
--check Whether check config (default: false)
-c, --config Config file
-e, --deploy-mode Flink job deploy mode, support [run, run-application]
(default: run)
-h, --help Show the usage message
--master, --target Flink job submitted target master, support [local,
remote, yarn-session, yarn-per-job, kubernetes-session,
yarn-application, kubernetes-application]
-s,--fromSavepoint <savepointPath>,--fromCheckpoint <checkpointPath> Path to a savepoint (or an externalized checkpoint) to
restore the job from (equivalent to "flink run -s" ).
-n, --name SeaTunnel job name (default: SeaTunnel)
-i, --variable Variable substitution, such as -i city=beijing, or -i
date=20190318 (default: [])
```

</TabItem>
Expand Down
8 changes: 8 additions & 0 deletions docs/en/other-engine/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ sink{
}
```

### How to recover tasks from savepoint/checkpoint

When you need to recover a task from a savepoint/checkpoint, simply specify the relevant savepoint/checkpoint directory when starting the task(equivalent to `flink run -s`).

```
bin/start-seatunnel-flink-13-connector-v2.sh --target local --fromSavepoint hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902 --deploy-mode run --config ./config/example.conf
```

### How To Run A Job In A Project

After you pull the code to the local, go to the `seatunnel-examples/seatunnel-flink-connector-v2-example` module and find `org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample` to complete the operation of the job.
48 changes: 26 additions & 22 deletions docs/zh/command/usage.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,19 @@ Usage: start-seatunnel-spark-3-connector-v2.sh [options]
```bash
Usage: start-seatunnel-flink-13-connector-v2.sh [options]
Options:
--check Whether check config (default: false)
-c, --config Config file
-e, --deploy-mode Flink job deploy mode, support [run, run-application]
(default: run)
-h, --help Show the usage message
--master, --target Flink job submitted target master, support [local,
remote, yarn-session, yarn-per-job, kubernetes-session,
yarn-application, kubernetes-application]
-n, --name SeaTunnel job name (default: SeaTunnel)
-i, --variable Variable substitution, such as -i city=beijing, or -i
date=20190318 (default: [])
--check Whether check config (default: false)
-c, --config Config file
-e, --deploy-mode Flink job deploy mode, support [run, run-application]
(default: run)
-h, --help Show the usage message
--master, --target Flink job submitted target master, support [local,
remote, yarn-session, yarn-per-job, kubernetes-session,
yarn-application, kubernetes-application]
-s,--fromSavepoint <savepointPath>,--fromCheckpoint <checkpointPath> Path to a savepoint (or an externalized checkpoint) to
restore the job from (equivalent to "flink run -s" ).
-n, --name SeaTunnel job name (default: SeaTunnel)
-i, --variable Variable substitution, such as -i city=beijing, or -i
date=20190318 (default: [])
```

</TabItem>
Expand All @@ -118,17 +120,19 @@ Usage: start-seatunnel-flink-13-connector-v2.sh [options]
```bash
Usage: start-seatunnel-flink-15-connector-v2.sh [options]
Options:
--check Whether check config (default: false)
-c, --config Config file
-e, --deploy-mode Flink job deploy mode, support [run, run-application]
(default: run)
-h, --help Show the usage message
--master, --target Flink job submitted target master, support [local,
remote, yarn-session, yarn-per-job, kubernetes-session,
yarn-application, kubernetes-application]
-n, --name SeaTunnel job name (default: SeaTunnel)
-i, --variable Variable substitution, such as -i city=beijing, or -i
date=20190318 (default: [])
--check Whether check config (default: false)
-c, --config Config file
-e, --deploy-mode Flink job deploy mode, support [run, run-application]
(default: run)
-h, --help Show the usage message
--master, --target Flink job submitted target master, support [local,
remote, yarn-session, yarn-per-job, kubernetes-session,
yarn-application, kubernetes-application]
-s,--fromSavepoint <savepointPath>,--fromCheckpoint <checkpointPath> Path to a savepoint (or an externalized checkpoint) to
restore the job from (equivalent to "flink run -s" ).
-n, --name SeaTunnel job name (default: SeaTunnel)
-i, --variable Variable substitution, such as -i city=beijing, or -i
date=20190318 (default: [])
```

</TabItem>
Expand Down
8 changes: 8 additions & 0 deletions docs/zh/other-engine/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ sink{
}
```

### 如何从保存点/检查点恢复任务

当你需要从保存点/检查点恢复任务时,只需启动任务时指定相关的保存点/检查点目录即可(与`flink run -s`功能相同)。

```
bin/start-seatunnel-flink-13-connector-v2.sh --target local --fromSavepoint hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902 --deploy-mode run --config ./config/example.conf
```

### 如何在项目中运行Job

当你将代码拉到本地后,转到 `seatunnel-examples/seatunnel-flink-connector-v2-example` 模块,查找 `org.apache.seatunnel.example.flink.v2.SeaTunnelApiExample` 即可完成job的操作。
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,29 @@
<version>${project.version}</version>
</dependency>

<!-- test -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-e2e-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.1.15.3.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.core.starter.flink;

import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;

import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.Starter;
Expand Down Expand Up @@ -56,6 +58,11 @@ public List<String> buildCommands() {
command.add("${FLINK_HOME}/bin/flink");
// set deploy mode, run or run-application
command.add(flinkCommandArgs.getDeployMode().getDeployMode());
// set restore checkpoint
if (StringUtils.isNoneBlank(flinkCommandArgs.getFromSavepoint())) {
command.add("-s");
command.add(flinkCommandArgs.getFromSavepoint());
}
// set submitted target master
if (flinkCommandArgs.getMasterType() != null) {
command.add("--target");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
+ "kubernetes-session, yarn-application, kubernetes-application]")
private MasterType masterType;

/** restore savepoint/checkpoint path */
@Parameter(
names = {"-s", "--fromSavepoint", "--fromCheckpoint"},
description =
"Path to a savepoint (or an externalized checkpoint) to restore the job from (equivalent to flink run -s ).")
protected String fromSavepoint;

@Override
public Command<?> buildCommand() {
Common.setDeployMode(getDeployMode());
Expand All @@ -75,6 +82,8 @@ public String toString() {
+ deployMode
+ ", masterType="
+ masterType
+ ", fromSavepoint="
+ fromSavepoint
+ ", configFile='"
+ configFile
+ '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.FileNotFoundException;
import java.net.URISyntaxException;
import java.util.List;

import static org.apache.seatunnel.api.options.ConnectorCommonOptions.PLUGIN_NAME;

Expand Down Expand Up @@ -63,4 +64,36 @@ private static FlinkCommandArgs buildFlinkCommandArgs(String configFile) {
flinkCommandArgs.setVariables(null);
return flinkCommandArgs;
}

@Test
public void testBuildFlinkCommandArgsWithSavePoint() {
FlinkStarter flinkStarter =
new FlinkStarter(
new String[] {
"--target", "local",
"--deploy-mode ", "run",
"--config ", "/config/fake_to_inmemory1.json",
"--fromCheckpoint ",
"hdfs:///flink/checkpoints/3c298a925d9a2a7837bbf5a8e4966b4f/chk-7902"
});
List<String> commands = flinkStarter.buildCommands();
Assertions.assertTrue(
commands.contains("-s") || commands.contains("--fromSavepoint"),
"The flink commands should include either `-s` or `--fromSavepoint`");
}

@Test
public void testBuildFlinkCommandArgsWithoutSavePoint() {
FlinkStarter flinkStarter =
new FlinkStarter(
new String[] {
"--target", "local",
"--deploy-mode ", "run",
"--config ", "/config/fake_to_inmemory1.json",
});
List<String> commands = flinkStarter.buildCommands();
Assertions.assertTrue(
!commands.contains("-s") && !commands.contains("--fromSavepoint"),
"The flink commands should not include either `-s` or `--fromSavepoint`");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.flink.multitable.MultiTableSinkTest;
import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.core.starter.utils.FileUtils;
Expand Down Expand Up @@ -51,7 +52,9 @@ public void testFlinkParameter() throws Exception {
flinkCommandArgs.setEncrypt(false);
flinkCommandArgs.setDecrypt(false);
flinkCommandArgs.setHelp(false);
flinkCommandArgs.setConfigFile("src/test/java/resources/test_flink_run_parameter.conf");

flinkCommandArgs.setConfigFile(
MultiTableSinkTest.getTestConfigFile("/test_flink_run_parameter.conf"));
flinkCommandArgs.setVariables(null);
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
Config config = ConfigBuilder.of(configFile).getConfig("env");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

import java.io.FileNotFoundException;
Expand All @@ -36,7 +35,6 @@
import java.util.Collections;
import java.util.List;

@Order(1)
public class MultiTableSinkTest {

@Test
Expand Down