-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-36581][cli] Allow passing Flink configuration by yaml job file #3918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
42bf034 to
18f57f6
Compare
|
@lvyanquan @yuxiqian PTAL~ |
| org.apache.flink.configuration.Configuration flinkConfig) { | ||
| return new FlinkPipelineComposer( | ||
| StreamExecutionEnvironment.getExecutionEnvironment(), true); | ||
| StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix flink configuration fails in use-mini-cluster mode
|
Could you clarify the configuration priority hierarchy here? |
Please add this to document since we can add configuration through |
ca81bb3 to
522ab43
Compare
| name: MySQL to OceanBase Pipeline | ||
| parallelism: 1 | ||
| flink: | ||
| execution.checkpointing.interval: 2min |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added flink parameters to the pipeline-connector demo, prompting users to configure flink parameters in pipeline.yaml
|
Please help review again @lvyanquan @joyCurry30 |
| + "or the environment variable \"FLINK_HOME\". " | ||
| + "Please make sure Flink home is properly set. "); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the methods for loading and merging flink config to the FlinkEnvironmentUtils class
joyCurry30
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @MOBIN-F ! I left some comments and questions.
| 通过Cli将作业提交至 Flink Yarn Application 集群。 | ||
| ```bash | ||
| cd /path/flink-cdc-* | ||
| ./bin/flink-cdc.sh -t yarn-application -s hdfs:///flink/savepoint-1537 -Dexecution.checkpointing.interval=2min mysql-to-doris.yaml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it’s better to keep this example to show how to submit a job using the command line, and then show another YAML file for submitting jobs with Flink configurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should recommend users to configure Flink parameters in YAML as much as possible, the command-line method is not intuitive and ugly,WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to keep CLI mode examples. Checkpoint path is not a static configuration and prone to change. Hard encoding it in YAML file is not better than dynamic CLI arguments.
| private static final String TRANSFORM_KEY = "transform"; | ||
| private static final String PIPELINE_KEY = "pipeline"; | ||
| private static final String MODEL_KEY = "model"; | ||
| private static final String FLINK_KEY = "flink"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think “flink” is a good choice as a key. Would it be better to use “configuration” or something else instead?What do you think? @lvyanquan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe flink-config can be used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC only runtime execution configurations could be dynamically overridden. What about naming it execution-config?
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java
Outdated
Show resolved
Hide resolved
522ab43 to
168ef80
Compare
8e0165a to
7e04477
Compare
7e04477 to
26f48c3
Compare
|
Do we have plans to merge this PR in 3.4? I want to merge this PR in 3.4, WDYT?@lvyanquan |
|
This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs. |
|
This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. |
|
@MOBIN-F Would you like to continue working on this PR? |
yes,I will finish the remaining work next week~ |
26f48c3 to
4160459
Compare
4160459 to
3c85f01
Compare
|
@lvyanquan @yuxiqian please trigger CI,thanks~ |
yuxiqian
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @MOBIN-F's quick response!
Just left some general comments here, please take a look when available :)
| | local-time-zone | 作业级别的本地时区。 | optional | | ||
| | execution.runtime-mode | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional | | ||
| | operator.uid.prefix | Pipeline 中算子 UID 的前缀。如果不设置,Flink 会为每个算子生成唯一的 UID。 建议设置这个参数以提供稳定和可识别的算子 ID,这有助于有状态升级、问题排查和在 Flink UI 上的诊断。 | optional | | ||
| | flink-conf | 用于配置[Flink相关参数](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/)。 <br/>Flink参数优先级:config.yaml < job command-line < pipeline.yaml | optional | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| | flink-conf | 用于配置[Flink相关参数](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/)。 <br/>Flink参数优先级:config.yaml < job command-line < pipeline.yaml | optional | | |
| | flink-conf | 用于配置[Flink 参数](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/)。 <br/>Flink 参数生效优先级(由高到低)为 CDC CLI 命令行参数、Pipeline YAML 块、Flink `config.yaml`。 | optional | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please specify if modern config.yaml or legacy flink-conf.yaml[1] is supported.
| | `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes. | optional | | ||
| | `operator.uid.prefix` | The prefix to use for all pipeline operator UIDs. If not set, all pipeline operator UIDs will be generated by Flink. It is recommended to set this parameter to ensure stable and recognizable operator UIDs, which can help with stateful upgrades, troubleshooting, and Flink UI diagnostics. | optional | | ||
|
|
||
| | flink-conf | Used to configure [Flink related parameters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/). <br/>Flink parameter priority: config.yaml < job command-line < pipeline.yaml | optional | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| | flink-conf | Used to configure [Flink related parameters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/). <br/>Flink parameter priority: config.yaml < job command-line < pipeline.yaml | optional | | |
| | flink-conf | Used to configure [Flink related parameters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/). <br/> Flink configurations will take effect in the following order (high to low): CDC CLI arguments, YAML Pipeline config blocks, and Flink `config.yaml`. | optional | |
| | flink-conf | Used to configure [Flink related parameters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/). <br/>Flink parameter priority: config.yaml < job command-line < pipeline.yaml | optional | | ||
| NOTE: Whilst the above parameters are each individually optional, at least one of them must be specified. The `pipeline` section is mandatory and cannot be empty. | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert accidental changes
| execution.target: yarn-session | ||
| yarn.application.id: {{YARN_APPLICATION_ID}} | ||
| execution.checkpointing.interval: 2min | ||
| #If you need to restore from a savepoint, configure the following parameters: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| #If you need to restore from a savepoint, configure the following parameters: | |
| # If you need to restore from a savepoint, uncomment the next line: |
| #如果需要从savepoint恢复,则配置以下参数 | ||
| #execution.savepoint.path: hdfs:///flink/savepoint-1537 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| #如果需要从savepoint恢复,则配置以下参数 | |
| #execution.savepoint.path: hdfs:///flink/savepoint-1537 | |
| # 如需从 savepoint 恢复,可配置以下参数: | |
| # execution.savepoint.path: hdfs:///flink/savepoint-1537 |
| | local-time-zone | 作业级别的本地时区。 | optional | | ||
| | execution.runtime-mode | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional | | ||
| | operator.uid.prefix | Pipeline 中算子 UID 的前缀。如果不设置,Flink 会为每个算子生成唯一的 UID。 建议设置这个参数以提供稳定和可识别的算子 ID,这有助于有状态升级、问题排查和在 Flink UI 上的诊断。 | optional | | ||
| | flink-conf | 用于配置[Flink相关参数](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/)。 <br/>Flink参数优先级:config.yaml < job command-line < pipeline.yaml | optional | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the YAML flink-conf block should have higher priority than CLI arguments. IIRC Flink SQL client and CLI arguments could override static config files, too.
A reasonable priority order might be CDC CLI arguments > YAML flink-conf block > Flink cluster config file.
| pipeline: | ||
| name: MySQL to StarRocks Pipeline | ||
| parallelism: 2 | ||
| flink-conf: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to modify documents of every connector, it's optional anyway. It has been described in the pipeline concept page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add an Pipeline E2e case to verify the priority of Flink config, YAML flink-conf block, and CDC CLI arguments.
At present, FlinkCDC only supports reading Flink configurations from Flink con files, but this approach is not user-friendly for multiple jobs that require different configurations. Allow passing Flink configuration by yaml job file
such as :