1111* Flink集群: 1.4,1.5,1.8,1.9,1.10(单机模式不需要安装Flink集群)
1212* 操作系统:理论上不限
1313* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
14- ```
14+ ``` yaml
1515# 提交到hadoop环境一定要配置fs.hdfs.hadoopconf参数
1616fs.hdfs.hadoopconf : /Users/maqi/tmp/hadoopconf/hadoop_250
1717security.kerberos.login.use-ticket-cache : true
@@ -24,7 +24,7 @@ zookeeper.sasl.login-context-name: Client
2424
2525### 1.3 打包
2626进入项目根目录,使用maven打包:
27- ```
27+ ` ` ` shell script
2828mvn clean package -Dmaven.test.skip
2929```
3030
@@ -59,193 +59,23 @@ sh submit.sh
5959 -pluginLoadMode shipfile
6060 -confProp {\" time.characteristic\" :\" eventTime\" ,\" logLevel\" :\" info\" }
6161 -yarnSessionConf {\" yid\" :\" application_1586851105774_0014\" }
62- ```
63- ``` shell script
64- # 通过idea启动 程序入口类LaucherMain
65- # Run/Debug Configurations中设置Program arguments
66- -mode yarnPer
67- -sql /home/wen/Desktop/flink_stream_sql_conf/sql/stressTest.sql
68- -name stressTestAll
69- -localSqlPluginPath /home/wen/IdeaProjects/flinkStreamSQL/plugins
70- -remoteSqlPluginPath /home/wen/IdeaProjects/flinkStreamSQL/plugins
71- -flinkconf /home/wen/Desktop/flink_stream_sql_conf/flinkConf
72- -yarnconf /home/wen/Desktop/flink_stream_sql_conf/yarnConf_node1
73- -flinkJarPath /home/wen/Desktop/dtstack/flink-1.8.1/lib
74- -pluginLoadMode shipfile
75- -confProp {\" time.characteristic\" :\" eventTime\" ,\" logLevel\" :\" info\" }
76- -queue c
77- ```
78- #### 1.4.2 命令参数说明
79- * ** mode**
80- * 描述:执行模式,也就是flink集群的工作模式
81- * local: 本地模式
82- * standalone: 提交到独立部署模式的flink集群
83- * yarn: 提交到yarn模式的flink集群,该模式下需要提前启动一个yarn-session,使用默认名"Flink session cluster"
84- * yarnPer: yarn per_job模式提交(即创建新flink application),默认名为flink任务名称
85- * 必选:否
86- * 默认值:local
87-
88- * ** name**
89- * 描述:flink 任务对应名称。
90- * 必选:是
91- * 默认值:无
9262
93- * ** sql**
94- * 描述:待执行的flink sql所在路径 。
95- * 必选:是
96- * 默认值:无
97-
98- * ** localSqlPluginPath**
99- * 描述:本地插件根目录地址,也就是打包后产生的plugins目录。
100- * 必选:是
101- * 默认值:无
102-
103- * ** remoteSqlPluginPath**
104- * 描述:flink执行集群上的插件根目录地址(将打包好的插件存放到各个flink节点上,如果是yarn集群需要存放到所有的nodemanager上)。
105- * 必选:否
106- * 默认值:无
107-
108- * ** addjar**
109- * 描述:扩展jar路径,当前主要是UDF定义的jar;
110- * 必选:否
111- * 默认值:无
112-
113- * ** confProp**
114- * 描述:一些参数设置
115- * 必选:否
116- * 默认值:{}
117- * 可选参数:
118- * sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒)
119- * sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟
120- * state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。
121- * state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。
122- * state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。
123- * sql.env.parallelism: 默认并行度设置
124- * sql.max.env.parallelism: 最大并行度设置
125- * time.characteristic: 可选值[ ProcessingTime|IngestionTime|EventTime]
126- * sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms)
127- * sql.checkpoint.mode: 可选值[ EXACTLY_ONCE|AT_LEAST_ONCE]
128- * sql.checkpoint.timeout: 生成checkpoint的超时时间(ms)
129- * sql.max.concurrent.checkpoints: 最大并发生成checkpoint数
130- * sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[ true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)]
131- * flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file://
132- * jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768)
133- * taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768)
134- * taskmanager.num: per_job模式下指定taskmanager的实例数(默认1)
135- * taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
136- * savePointPath:任务恢复点的路径(默认无)
137- * allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
138- * restore.enable:是否失败重启(默认是true)
139- * failure.interval:衡量失败率的时间段,单位分钟(默认6m)
140- * delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
141- * logLevel: 日志级别动态配置(默认info)
142- * [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
143-
144-
145- * ** flinkconf**
146- * 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.10.0/conf
147- * 必选:否
148- * 默认值:无
149-
150- * ** yarnconf**
151- * 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
152- * 必选:否
153- * 默认值:无
154-
155- * ** flinkJarPath**
156- * 描述:per_job 模式提交需要指定本地的flink jar存放路径
157- * 必选:否
158- * 默认值:无
159-
160- * ** queue**
161- * 描述:per_job 模式下指定的yarn queue
162- * 必选:否
163- * 默认值:default
164-
165- * ** pluginLoadMode**
166- * 描述:per_job 模式下的插件包加载方式。classpath:从每台机器加载插件包,shipfile:将需要插件从提交的节点上传到hdfs,不需要每台安装插件
167- * 必选:否
168- * 默认值:classpath
169-
170- * ** yarnSessionConf**
171- * 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
172- * 必选:否
173- * 默认值:{}
174-
175- ## 1.5 任务样例
63+ # mode: 任务启动的模式
64+ # name: 本次任务名称
65+ # sql: 本次任务执行sql脚本
66+ # localSqPluginPath: 本地插件包根目录地址
67+ # remoteSqlPluginPath: flink执行集群上的插件根目录地址
68+ # flinkconf: flink配置文件所在目录(单机模式下不需要)
69+ # yarnconf: Hadoop配置文件(包括hdfs和yarn)所在目录
70+ # flinkJarPath: yarnPer模式提交需要指定本地的flink jar存放路径
71+ # pluginLoadMode:yarnPer模式下的插件包加载方式
72+ # confProp: 其他额外参数配置
73+ # yarnSessionConf: yarnSession模式下指定的运行参数,目前只支持指定yid
17674```
177- # 一个 kafka source join all维表 到 kafka sink的样例
178- CREATE TABLE MyTable(
179- id bigint,
180- name varchar,
181- address varchar
182- )WITH(
183- type = 'kafka10',
184- bootstrapServers = '172.16.101.224:9092',
185- zookeeperQuorm = '172.16.100.188:2181/kafka',
186- offsetReset = 'latest',
187- topic = 'tiezhu_test_in',
188- groupId = 'flink_sql',
189- timezone = 'Asia/Shanghai',
190- topicIsPattern = 'false',
191- parallelism = '1'
192- );
193-
194- CREATE TABLE sideTable(
195- id bigint,
196- school varchar,
197- home varchar,
198- PRIMARY KEY(id),
199- PERIOD FOR SYSTEM_TIME
200- )WITH(
201- type='mysql',
202- url='jdbc:mysql://172.16.8.109:3306/tiezhu',
203- userName='dtstack',
204- password='you-guess',
205- tableName='stressTest',
206- cache='ALL',
207- parallelism='1',
208- asyncCapacity='100'
209- );
75+ 参数具体细节请看[ 命令参数说明] ( ./config.md )
21076
211- CREATE TABLE MyResult(
212- id bigint,
213- name varchar,
214- address varchar,
215- home varchar,
216- school varchar
217- )WITH(
218- type = 'kafka10',
219- bootstrapServers = '172.16.101.224:9092',
220- zookeeperQuorm = '172.16.100.188:2181/kafka',
221- offsetReset = 'latest',
222- topic = 'tiezhu_test_out',
223- parallelism = '1'
224- );
225-
226-
227- insert
228- into
229- MyResult
230- select
231- t1.id AS id,
232- t1.name AS name,
233- t1.address AS address,
234- t2.school AS school,
235- t2.home AS home
236- from
237- (
238- select
239- id,
240- name,
241- address
242- from
243- MyTable
244- ) t1
245- left join sideTable t2
246- on t2.id = t2.id;
247- ```
77+ 任务sql详情请看[ 任务样例] ( ./demo.md )
24878
249- # 招聘
79+ ### 招聘
250801.大数据平台开发工程师,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至
[email protected] 。
25181
0 commit comments