11# flinkStreamSQL
22> * 基于开源的flink,对其实时sql进行扩展
3- > > 自定义create table 语法(包括输入源表 ,输出表,维表)
4- > > 自定义create function 语法
5- > > 实现了流与维表的join
3+ > > * 自定义create table 语法(包括源表 ,输出表,维表)
4+ > > * 自定义create function 语法
5+ > > * 实现了流与维表的join
66
77
88## 1 快速起步
@@ -72,15 +72,15 @@ mvn clean package -Dmaven.test.skip
7272 * 必选:否
7373 * 默认值:无
7474 * 可选参数:
75- sql.env.parallelism: 默认并行度设置
76- sql.max.env.parallelism: 最大并行度设置
77- time.characteristic: 可选值[ ProcessingTime|IngestionTime|EventTime]
78- sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms)
79- sql.checkpoint.mode: 可选值[ EXACTLY_ONCE|AT_LEAST_ONCE]
80- sql.checkpoint.timeout: 生成checkpoint的超时时间(ms)
81- sql.max.concurrent.checkpoints: 最大并发生成checkpoint数
82- sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[ true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)]
83- flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file://
75+ * sql.env.parallelism: 默认并行度设置
76+ * sql.max.env.parallelism: 最大并行度设置
77+ * time.characteristic: 可选值[ ProcessingTime|IngestionTime|EventTime]
78+ * sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms)
79+ * sql.checkpoint.mode: 可选值[ EXACTLY_ONCE|AT_LEAST_ONCE]
80+ * sql.checkpoint.timeout: 生成checkpoint的超时时间(ms)
81+ * sql.max.concurrent.checkpoints: 最大并发生成checkpoint数
82+ * sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[ true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)]
83+ * flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file://
8484
8585
8686* ** flinkconf**
@@ -91,4 +91,94 @@ mvn clean package -Dmaven.test.skip
9191* ** yarnconf**
9292 * 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
9393 * 必选:否
94- * 默认值:无
94+ * 默认值:无
95+
96+ ## 2 结构
97+ ### 2.1 源表插件
98+ * [ kafka09 源表插件] ( docs/kafka09Source.md )
99+
100+ ### 2.2 结果表插件
101+ * [ elasticsearch 结果表插件] ( docs/elasticsearchSink.md )
102+ * [ hbase 结果表插件] ( docs/hbaseSink.md )
103+ * [ mysql 结果表插件] ( docs/mysqlSink.md )
104+
105+ ### 2.3 维表插件
106+ * [ hbase 维表插件] ( docs/hbaseSide.md )
107+ * [ mysql 维表插件] ( docs/mysqlSide.md )
108+
109+ ## 3 样例
110+
111+ ```
112+ CREATE TABLE MyTable(
113+ name string,
114+ channel STRING,
115+ pv INT,
116+ xctime bigint,
117+ CHARACTER_LENGTH(channel) AS timeLeng
118+ )WITH(
119+ type ='kafka09',
120+ bootstrapServers ='172.16.8.198:9092',
121+ zookeeperQuorum ='172.16.8.198:2181/kafka',
122+ offsetReset ='latest',
123+ topic ='nbTest1',
124+ parallelism ='1'
125+ );
126+
127+ CREATE TABLE MyResult(
128+ channel VARCHAR,
129+ pv VARCHAR
130+ )WITH(
131+ type ='mysql',
132+ url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8',
133+ userName ='dtstack',
134+ password ='abc123',
135+ tableName ='pv2',
136+ parallelism ='1'
137+ );
138+
139+ CREATE TABLE workerinfo(
140+ cast(logtime as TIMESTAMP)AS rtime,
141+ cast(logtime)AS rtime
142+ )WITH(
143+ type ='hbase',
144+ zookeeperQuorum ='rdos1:2181',
145+ tableName ='workerinfo',
146+ rowKey ='ce,de',
147+ parallelism ='1',
148+ zookeeperParent ='/hbase'
149+ );
150+
151+ CREATE TABLE sideTable(
152+ cf:name String as name,
153+ cf:info String as info,
154+ PRIMARY KEY(name),
155+ PERIOD FOR SYSTEM_TIME
156+ )WITH(
157+ type ='hbase',
158+ zookeeperQuorum ='rdos1:2181',
159+ zookeeperParent ='/hbase',
160+ tableName ='workerinfo',
161+ cache ='LRU',
162+ cacheSize ='10000',
163+ cacheTTLMs ='60000',
164+ parallelism ='1'
165+ );
166+
167+ insert
168+ into
169+ MyResult
170+ select
171+ d.channel,
172+ d.info
173+ from
174+ ( select
175+ a.*,b.info
176+ from
177+ MyTable a
178+ join
179+ sideTable b
180+ on a.channel=b.name
181+ where
182+ a.channel = 'xc2'
183+ and a.pv=10 ) as d
184+ ```
0 commit comments