88### 1.2 执行环境
99
1010* Java: JDK8及以上
11- * Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群)
11+ * Flink集群: 1.4,1.5,1.8,1.9,1.10 (单机模式不需要安装Flink集群)
1212* 操作系统:理论上不限
1313* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
1414```
@@ -22,42 +22,230 @@ zookeeper.sasl.service-name: zookeeper
2222zookeeper.sasl.login-context-name: Client
2323```
2424
25-
2625### 1.3 打包
27-
2826进入项目根目录,使用maven打包:
29-
3027```
3128mvn clean package -Dmaven.test.skip
32-
3329```
3430
3531####可运行的目录结构:
3632```
3733|
3834|-----bin
3935| |--- submit.sh 任务启动脚本
40- |-----lib
41- | |--- sql.launcher.jar 包存储路径,是任务提交的入口(需要手动移动到该目录)
36+ |-----lib: launcher包存储路径,是任务提交的入口
37+ | |--- sql.launcher.jar
4238|-----plugins: 插件包存储路径(mvn 打包之后会自动将jar移动到该目录下)
4339| |--- core.jar
4440| |--- xxxsource
4541| |--- xxxsink
4642| |--- xxxside
4743```
48- ### 1.4 启动
44+ ### 1.4 快速启动
4945
5046#### 1.4.1 启动命令
5147
48+ ``` shell script
49+ # 脚本启动
50+ sh submit.sh
51+ -mode yarn
52+ -name flink1.10_yarnSession
53+ -sql F:\d tstack\s tressTest\f linkStreamSql\1 .10_dev\s ql\f link1100.sql
54+ -localSqlPluginPath F:\d tstack\p roject\f linkStreamSQL\p lugins
55+ -remoteSqlPluginPath F:\d tstack\p roject\f linkStreamSQL\p lugins
56+ -flinkconf F:\d tstack\f link\f link-1.10.0\c onf
57+ -yarnconf F:\d tstack\f linkStreamSql\y arnConf_node1
58+ -flinkJarPath F:\d tstack\f link\f link-1.10.0\l ib
59+ -pluginLoadMode shipfile
60+ -confProp {\" time.characteristic\" :\" eventTime\" ,\" logLevel\" :\" info\" }
61+ -yarnSessionConf {\" yid\" :\" application_1586851105774_0014\" }
5262```
53- sh submit.sh -sql D:\sideSql.txt
54- -name xctest
55- -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin
56- -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins
57- -addjar \["udf.jar\"\]
58- -mode yarn
59- -flinkconf D:\flink_home\kudu150etc
60- -yarnconf D:\hadoop\etc\hadoopkudu
61- -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\}
62- -yarnSessionConf \{\"yid\":\"application_1564971615273_38182\"}
63- ```
63+ ``` shell script
64+ # 通过idea启动 程序入口类LaucherMain
65+ # Run/Debug Configurations中设置Program arguments
66+ -mode yarnPer
67+ -sql /home/wen/Desktop/flink_stream_sql_conf/sql/stressTest.sql
68+ -name stressTestAll
69+ -localSqlPluginPath /home/wen/IdeaProjects/flinkStreamSQL/plugins
70+ -remoteSqlPluginPath /home/wen/IdeaProjects/flinkStreamSQL/plugins
71+ -flinkconf /home/wen/Desktop/flink_stream_sql_conf/flinkConf
72+ -yarnconf /home/wen/Desktop/flink_stream_sql_conf/yarnConf_node1
73+ -flinkJarPath /home/wen/Desktop/dtstack/flink-1.8.1/lib
74+ -pluginLoadMode shipfile
75+ -confProp {\" time.characteristic\" :\" eventTime\" ,\" logLevel\" :\" info\" }
76+ -queue c
77+ ```
78+ #### 1.4.2 命令参数说明
79+ * ** mode**
80+ * 描述:执行模式,也就是flink集群的工作模式
81+ * local: 本地模式
82+ * standalone: 提交到独立部署模式的flink集群
83+ * yarn: 提交到yarn模式的flink集群,该模式下需要提前启动一个yarn-session,使用默认名"Flink session cluster"
84+ * yarnPer: yarn per_job模式提交(即创建新flink application),默认名为flink任务名称
85+ * 必选:否
86+ * 默认值:local
87+
88+ * ** name**
89+ * 描述:flink 任务对应名称。
90+ * 必选:是
91+ * 默认值:无
92+
93+ * ** sql**
94+ * 描述:待执行的flink sql所在路径 。
95+ * 必选:是
96+ * 默认值:无
97+
98+ * ** localSqlPluginPath**
99+ * 描述:本地插件根目录地址,也就是打包后产生的plugins目录。
100+ * 必选:是
101+ * 默认值:无
102+
103+ * ** remoteSqlPluginPath**
104+ * 描述:flink执行集群上的插件根目录地址(将打包好的插件存放到各个flink节点上,如果是yarn集群需要存放到所有的nodemanager上)。
105+ * 必选:否
106+ * 默认值:无
107+
108+ * ** addjar**
109+ * 描述:扩展jar路径,当前主要是UDF定义的jar;
110+ * 必选:否
111+ * 默认值:无
112+
113+ * ** confProp**
114+ * 描述:一些参数设置
115+ * 必选:否
116+ * 默认值:{}
117+ * 可选参数:
118+ * sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒)
119+ * sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟
120+ * state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。
121+ * state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。
122+ * state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。
123+ * sql.env.parallelism: 默认并行度设置
124+ * sql.max.env.parallelism: 最大并行度设置
125+ * time.characteristic: 可选值[ ProcessingTime|IngestionTime|EventTime]
126+ * sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms)
127+ * sql.checkpoint.mode: 可选值[ EXACTLY_ONCE|AT_LEAST_ONCE]
128+ * sql.checkpoint.timeout: 生成checkpoint的超时时间(ms)
129+ * sql.max.concurrent.checkpoints: 最大并发生成checkpoint数
130+ * sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[ true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)]
131+ * flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file://
132+ * jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768)
133+ * taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768)
134+ * taskmanager.num: per_job模式下指定taskmanager的实例数(默认1)
135+ * taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
136+ * savePointPath:任务恢复点的路径(默认无)
137+ * allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
138+ * restore.enable:是否失败重启(默认是true)
139+ * failure.interval:衡量失败率的时间段,单位分钟(默认6m)
140+ * delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
141+ * logLevel: 日志级别动态配置(默认info)
142+ * [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例
143+
144+
145+ * ** flinkconf**
146+ * 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.10.0/conf
147+ * 必选:否
148+ * 默认值:无
149+
150+ * ** yarnconf**
151+ * 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
152+ * 必选:否
153+ * 默认值:无
154+
155+ * ** flinkJarPath**
156+ * 描述:per_job 模式提交需要指定本地的flink jar存放路径
157+ * 必选:否
158+ * 默认值:无
159+
160+ * ** queue**
161+ * 描述:per_job 模式下指定的yarn queue
162+ * 必选:否
163+ * 默认值:default
164+
165+ * ** pluginLoadMode**
166+ * 描述:per_job 模式下的插件包加载方式。classpath:从每台机器加载插件包,shipfile:将需要插件从提交的节点上传到hdfs,不需要每台安装插件
167+ * 必选:否
168+ * 默认值:classpath
169+
170+ * ** yarnSessionConf**
171+ * 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid
172+ * 必选:否
173+ * 默认值:{}
174+
175+ ## 1.5 任务样例
176+ ```
177+ # 一个 kafka source join all维表 到 kafka sink的样例
178+ CREATE TABLE MyTable(
179+ id bigint,
180+ name varchar,
181+ address varchar
182+ )WITH(
183+ type = 'kafka10',
184+ bootstrapServers = '172.16.101.224:9092',
185+ zookeeperQuorm = '172.16.100.188:2181/kafka',
186+ offsetReset = 'latest',
187+ topic = 'tiezhu_test_in',
188+ groupId = 'flink_sql',
189+ timezone = 'Asia/Shanghai',
190+ topicIsPattern = 'false',
191+ parallelism = '1'
192+ );
193+
194+ CREATE TABLE sideTable(
195+ id bigint,
196+ school varchar,
197+ home varchar,
198+ PRIMARY KEY(id),
199+ PERIOD FOR SYSTEM_TIME
200+ )WITH(
201+ type='mysql',
202+ url='jdbc:mysql://172.16.8.109:3306/tiezhu',
203+ userName='dtstack',
204+ password='you-guess',
205+ tableName='stressTest',
206+ cache='ALL',
207+ parallelism='1',
208+ asyncCapacity='100'
209+ );
210+
211+ CREATE TABLE MyResult(
212+ id bigint,
213+ name varchar,
214+ address varchar,
215+ home varchar,
216+ school varchar
217+ )WITH(
218+ type = 'kafka10',
219+ bootstrapServers = '172.16.101.224:9092',
220+ zookeeperQuorm = '172.16.100.188:2181/kafka',
221+ offsetReset = 'latest',
222+ topic = 'tiezhu_test_out',
223+ parallelism = '1'
224+ );
225+
226+
227+ insert
228+ into
229+ MyResult
230+ select
231+ t1.id AS id,
232+ t1.name AS name,
233+ t1.address AS address,
234+ t2.school AS school,
235+ t2.home AS home
236+ from
237+ (
238+ select
239+ id,
240+ name,
241+ address
242+ from
243+ MyTable
244+ ) t1
245+ left join sideTable t2
246+ on t2.id = t2.id;
247+ ```
248+
249+ # 招聘
250+ 1.大数据平台开发工程师,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至
[email protected] 。
251+
0 commit comments