Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content.zh/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ sink:

pipeline:
parallelism: 1

flink-conf:
execution.checkpointing.interval: 2min
```

## 连接器配置项
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ route:
pipeline:
name: MySQL to Elasticsearch Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

Pipeline Connector Options
Expand Down
2 changes: 2 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ sink:
pipeline:
name: MySQL to Kafka Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

Pipeline 连接器配置项
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ sink:
pipeline:
name: MySQL to MaxCompute Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

## 连接器配置项
Expand Down
2 changes: 2 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ sink:
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
flink-conf:
execution.checkpointing.interval: 2min
```

## 连接器配置项
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ sink:
pipeline:
name: MySQL to OceanBase Pipeline
parallelism: 1
flink-conf:
execution.checkpointing.interval: 2min
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added flink parameters to the pipeline-connector demo, prompting users to configure flink parameters in pipeline.yaml

```

## 连接器配置项
Expand Down
2 changes: 2 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ sink:
pipeline:
name: MySQL to Paimon Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

Pipeline 连接器配置项
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ sink:
pipeline:
name: MySQL to StarRocks Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

## 连接器配置项
Expand Down
5 changes: 5 additions & 0 deletions docs/content.zh/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ under the License.
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

## 包含可选部分
Expand Down Expand Up @@ -106,6 +108,8 @@ under the License.
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
flink-conf:
execution.checkpointing.interval: 2min
```

# Pipeline 配置
Expand All @@ -118,3 +122,4 @@ under the License.
| local-time-zone | 作业级别的本地时区。 | optional |
| execution.runtime-mode | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional |
| operator.uid.prefix | Pipeline 中算子 UID 的前缀。如果不设置,Flink 会为每个算子生成唯一的 UID。 建议设置这个参数以提供稳定和可识别的算子 ID,这有助于有状态升级、问题排查和在 Flink UI 上的诊断。 | optional |
| flink-conf | 用于配置[Flink相关参数](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/)。 <br/>Flink参数优先级:config.yaml < job command-line < pipeline.yaml | optional |
Copy link
Member

@yuxiqian yuxiqian Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| flink-conf | 用于配置[Flink相关参数](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/)。 <br/>Flink参数优先级:config.yaml < job command-line < pipeline.yaml | optional |
| flink-conf | 用于配置[Flink 参数](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/)。 <br/>Flink 参数生效优先级(由高到低)为 CDC CLI 命令行参数、Pipeline YAML 块、Flink `config.yaml` | optional |

Copy link
Member

@yuxiqian yuxiqian Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please specify if modern config.yaml or legacy flink-conf.yaml[1] is supported.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#flink-configuration-file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the YAML flink-conf block should have higher priority than CLI arguments. IIRC Flink SQL client and CLI arguments could override static config files, too.

A reasonable priority order might be CDC CLI arguments > YAML flink-conf block > Flink cluster config file.

9 changes: 8 additions & 1 deletion docs/content.zh/docs/deployment/yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,14 @@ sink:
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2

flink-conf:
rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}
execution.target: yarn-session
yarn.application.id: {{YARN_APPLICATION_ID}}
execution.checkpointing.interval: 2min
#如果需要从savepoint恢复,则配置以下参数
#execution.savepoint.path: hdfs:///flink/savepoint-1537
Comment on lines +135 to +136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#如果需要从savepoint恢复,则配置以下参数
#execution.savepoint.path: hdfs:///flink/savepoint-1537
# 如需从 savepoint 恢复,可配置以下参数:
# execution.savepoint.path: hdfs:///flink/savepoint-1537

```

你可以按需修改配置文件。
Expand Down
3 changes: 2 additions & 1 deletion docs/content/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ sink:

pipeline:
parallelism: 1

flink-conf:
execution.checkpointing.interval: 2min
```

## Connector Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ route:
pipeline:
name: MySQL to Elasticsearch Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

Pipeline Connector Options
Expand Down
2 changes: 2 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ sink:
pipeline:
name: MySQL to Kafka Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

Pipeline Connector Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ sink:
pipeline:
name: MySQL to MaxCompute Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

## Connector Options
Expand Down
2 changes: 2 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ sink:
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
flink-conf:
execution.checkpointing.interval: 2min
```

## Connector Options
Expand Down
2 changes: 2 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/oceanbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ sink:
pipeline:
name: MySQL to OceanBase Pipeline
parallelism: 1
flink-conf:
execution.checkpointing.interval: 2min
```

## Connector Options
Expand Down
2 changes: 2 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ sink:
pipeline:
name: MySQL to Paimon Pipeline
parallelism: 2
flink-conf:
execution.checkpointing.interval: 2min
```

Pipeline Connector Options
Expand Down
2 changes: 2 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ sink:
pipeline:
name: MySQL to StarRocks Pipeline
parallelism: 2
flink-conf:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to modify documents of every connector, it's optional anyway. It has been described in the pipeline concept page.

execution.checkpointing.interval: 2min
```

## Connector Options
Expand Down
6 changes: 5 additions & 1 deletion docs/content/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ We could use following yaml file to define a complicated Data Pipeline describin
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
flink-conf:
execution.checkpointing.interval: 2min
```

# Pipeline Configurations
Expand All @@ -124,6 +126,8 @@ Note that whilst the parameters are each individually optional, at least one of
| `schema.operator.uid` | The unique ID for schema operator. This ID will be used for inter-operator communications and must be unique across operators. **Deprecated**: use `operator.uid.prefix` instead. | optional |
| `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes. | optional |
| `operator.uid.prefix` | The prefix to use for all pipeline operator UIDs. If not set, all pipeline operator UIDs will be generated by Flink. It is recommended to set this parameter to ensure stable and recognizable operator UIDs, which can help with stateful upgrades, troubleshooting, and Flink UI diagnostics. | optional |

| flink-conf | Used to configure [Flink related parameters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/). <br/>Flink parameter priority: config.yaml < job command-line < pipeline.yaml | optional |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| flink-conf | Used to configure [Flink related parameters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/). <br/>Flink parameter priority: config.yaml < job command-line < pipeline.yaml | optional |
| flink-conf | Used to configure [Flink related parameters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/). <br/> Flink configurations will take effect in the following order (high to low): CDC CLI arguments, YAML Pipeline config blocks, and Flink `config.yaml`. | optional |

NOTE: Whilst the above parameters are each individually optional, at least one of them must be specified. The `pipeline` section is mandatory and cannot be empty.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert accidental changes


9 changes: 8 additions & 1 deletion docs/content/docs/deployment/yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,14 @@ sink:
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2

flink-conf:
rest.bind-port: {{REST_PORT}}
rest.address: {{NODE_IP}}
execution.target: yarn-session
yarn.application.id: {{YARN_APPLICATION_ID}}
execution.checkpointing.interval: 2min
#If you need to restore from a savepoint, configure the following parameters:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#If you need to restore from a savepoint, configure the following parameters:
# If you need to restore from a savepoint, uncomment the next line:

#execution.savepoint.path: hdfs:///flink/savepoint-1537
```

You need to modify the configuration file according to your needs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public PipelineExecution.ExecutionInfo run() throws Exception {
case YARN_APPLICATION:
return deployWithApplicationComposer(new YarnApplicationDeploymentExecutor());
case LOCAL:
return deployWithComposer(FlinkPipelineComposer.ofMiniCluster());
return deployWithComposer(
FlinkPipelineComposer.ofMiniCluster(flinkConfig, additionalJars));
case REMOTE:
case YARN_SESSION:
return deployWithComposer(
Expand Down
Loading