Skip to content

Commit d3babe5

Browse files
committed
编辑readme.md
1 parent 3dd8dd1 commit d3babe5

File tree

3 files changed

+99
-5
lines changed

3 files changed

+99
-5
lines changed

README.md

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,94 @@
11
# flinkStreamSQL
2-
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join。
2+
> * 基于开源的flink,对其实时sql进行扩展
3+
> > 自定义create table 语法(包括输入源表,输出表,维表)
4+
> > 自定义create function 语法
5+
> > 实现了流与维表的join
6+
7+
8+
## 1 快速起步
9+
### 1.1 运行模式
10+
11+
12+
* 单机模式:对应Flink集群的单机模式
13+
* standalone模式:对应Flink集群的分布式模式
14+
* yarn模式:对应Flink集群的yarn模式
15+
16+
### 1.2 执行环境
17+
18+
* Java: JDK8及以上
19+
* Flink集群: 1.4(单机模式不需要安装Flink集群)
20+
* 操作系统:理论上不限
21+
22+
### 1.3 打包
23+
24+
进入项目根目录,使用maven打包:
25+
26+
```
27+
mvn clean package -Dmaven.test.skip
28+
```
29+
30+
打包结束后,项目根目录下会产生plugins目录,plugins目录下存放编译好的数据同步插件包
31+
32+
33+
### 1.4 启动
34+
35+
#### 1.4.1 命令行参数选项
36+
37+
* **model**
38+
* 描述:执行模式,也就是flink集群的工作模式
39+
* local: 本地模式
40+
* standalone: 独立部署模式的flink集群
41+
* yarn: yarn模式的flink集群
42+
* 必选:否
43+
* 默认值:local
44+
45+
* **name**
46+
* 描述:flink 任务对应名称。
47+
* 必选:是
48+
* 默认值:无
49+
50+
* **sql**
51+
* 描述:执行flink sql 的主体语句。
52+
* 必选:是
53+
* 默认值:无
54+
55+
* **localSqlPluginPath**
56+
* 描述:本地插件根目录地址,也就是打包后产生的plugins目录。
57+
* 必选:是
58+
* 默认值:无
59+
60+
* **remoteSqlPluginPath**
61+
* 描述:flink执行集群上的插件根目录地址(将打包好的插件存放到各个flink节点上,如果是yarn集群需要存放到所有的nodemanager上)。
62+
* 必选:否
63+
* 默认值:无
64+
65+
* **addjar**
66+
* 描述:扩展jar路径,当前主要是UDF定义的jar;
67+
* 必选:否
68+
* 默认值:无
69+
70+
* **confProp**
71+
* 描述:一些参数设置
72+
* 必选:否
73+
* 默认值:无
74+
* 可选参数:
75+
sql.env.parallelism: 默认并行度设置
76+
sql.max.env.parallelism: 最大并行度设置
77+
time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime]
78+
sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms)
79+
sql.checkpoint.mode: 可选值[EXACTLY_ONCE|AT_LEAST_ONCE]
80+
sql.checkpoint.timeout: 生成checkpoint的超时时间(ms)
81+
sql.max.concurrent.checkpoints: 最大并发生成checkpoint数
82+
sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)]
83+
flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file://
84+
85+
86+
* **flinkconf**
87+
* 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf
88+
* 必选:否
89+
* 默认值:无
90+
91+
* **yarnconf**
92+
* 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop
93+
* 必选:否
94+
* 默认值:无

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ public static void main(String[] args) throws Exception {
119119
Preconditions.checkNotNull(sql, "it requires input parameters sql");
120120
Preconditions.checkNotNull(name, "it requires input parameters name");
121121
Preconditions.checkNotNull(localSqlPluginPath, "it requires input parameters localSqlPluginPath");
122-
Preconditions.checkNotNull(remoteSqlPluginPath, "it requires input parameters remoteSqlPluginPath");
123122

124123
sql = URLDecoder.decode(sql, Charsets.UTF_8.name());
125124
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
@@ -260,7 +259,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
260259
}else{
261260
fields += ",PROCTIME.PROCTIME";
262261
}
263-
//tableEnv.registerDataStream(tableInfo.getName(), adaptStream, fields);
262+
264263
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
265264
tableEnv.registerTable(tableInfo.getName(), regTable);
266265
registerTableCache.put(tableInfo.getName(), regTable);

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,11 @@ public LauncherOptionParser(String[] args) {
8181
String localPlugin = Preconditions.checkNotNull(cl.getOptionValue(OPTION_LOCAL_SQL_PLUGIN_PATH));
8282
properties.put(OPTION_LOCAL_SQL_PLUGIN_PATH, localPlugin);
8383

84-
String remotePlugin = Preconditions.checkNotNull(cl.getOptionValue(OPTION_REMOTE_SQL_PLUGIN_PATH));
85-
properties.put(OPTION_REMOTE_SQL_PLUGIN_PATH, remotePlugin);
84+
String remotePlugin = cl.getOptionValue(OPTION_REMOTE_SQL_PLUGIN_PATH);
85+
if(!mode.equalsIgnoreCase(ClusterMode.MODE_LOCAL)){
86+
Preconditions.checkNotNull(remotePlugin);
87+
properties.put(OPTION_REMOTE_SQL_PLUGIN_PATH, remotePlugin);
88+
}
8689

8790
String name = Preconditions.checkNotNull(cl.getOptionValue(OPTION_NAME));
8891
properties.put(OPTION_NAME, name);

0 commit comments

Comments
 (0)