Skip to content

Commit 47268b9

Browse files
committed
1、savepoint使用多线程并发执行 2、修复catalog版本checkpoint功能无效 3、sql预校验功能优化 4、部分代码重构和优化 5:网络检查功能优化
1 parent 38d1027 commit 47268b9

File tree

31 files changed

+520
-249
lines changed

31 files changed

+520
-249
lines changed

flink-streaming-commom/src/main/java/com/flink/streaming/common/constant/SystemConstant.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,18 @@ public class SystemConstant {
1414

1515
public final static String SEMICOLON = ";";
1616

17-
public final static String LINE_FEED= "\n";
17+
public final static String LINE_FEED = "\n";
1818

19-
public final static String SPACE= "";
19+
public final static String SPACE = "";
2020

2121
public static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
2222

2323

2424
public final static String JARVERSION = "lib/flink-streaming-core-1.2.0.RELEASE.jar";
2525

26+
27+
public static final String QUERY_JOBID_KEY_WORD = "job-submitted-success:";
28+
29+
public static final String QUERY_JOBID_KEY_WORD_BACKUP = "Job has been submitted with JobID";
30+
2631
}

flink-streaming-commom/src/main/java/com/flink/streaming/common/enums/SqlCommand.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ public enum SqlCommand {
7070
"(ALTER\\s+FUNCTION.*)",
7171
(operands) -> Optional.of(new String[]{operands[0]})),
7272

73-
// SELECT(
74-
// "(WITH.*SELECT.*|SELECT.*)",
75-
// (operands) -> Optional.of(new String[]{operands[0]})),
73+
SELECT(
74+
"(WITH.*SELECT.*|SELECT.*)",
75+
(operands) -> Optional.of(new String[]{operands[0]})),
7676

7777
SHOW_CATALOGS(
7878
"SHOW\\s+CATALOGS",
@@ -117,7 +117,6 @@ public enum SqlCommand {
117117
public final Function<String[], Optional<String[]>> operandConverter;
118118

119119

120-
121120
SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) {
122121
this.pattern = Pattern.compile(matchingRegex, SystemConstant.DEFAULT_PATTERN_FLAGS);
123122
this.operandConverter = operandConverter;

flink-streaming-commom/src/main/java/com/flink/streaming/common/sql/SqlFileParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public static List<SqlCommandCall> fileToSql(List<String> lineList) {
3838
trimStart(line).startsWith(SystemConstant.COMMENT_SYMBOL)) {
3939
continue;
4040
}
41-
stmt.append("\n").append(line);
41+
stmt.append(SystemConstant.LINE_FEED).append(line);
4242
if (line.trim().endsWith(SystemConstant.SEMICOLON)) {
4343
Optional<SqlCommandCall> optionalCall = parse(stmt.toString());
4444
if (optionalCall.isPresent()) {

flink-streaming-core/src/main/java/com/flink/streaming/core/JobApplication.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package com.flink.streaming.core;
22

33

4+
import com.flink.streaming.common.constant.SystemConstant;
45
import com.flink.streaming.common.model.SqlCommandCall;
56
import com.flink.streaming.common.sql.SqlFileParser;
67
import com.flink.streaming.core.checkpoint.CheckPointParams;
8+
import com.flink.streaming.core.checkpoint.FsCheckPoint;
79
import com.flink.streaming.core.execute.ExecuteSql;
810
import com.flink.streaming.core.model.JobRunParam;
11+
import org.apache.flink.api.common.JobID;
912
import org.apache.flink.api.java.utils.ParameterTool;
1013
import org.apache.flink.calcite.shaded.com.google.common.base.Preconditions;
1114
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,6 +52,9 @@ public static void main(String[] args) {
4952

5053
TableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
5154

55+
//设置checkPoint
56+
FsCheckPoint.setCheckpoint(env, jobRunParam.getCheckPointParam());
57+
5258
List<String> sql = Files.readAllLines(Paths.get(jobRunParam.getSqlPath()));
5359

5460
List<SqlCommandCall> sqlCommandCallList = SqlFileParser.fileToSql(sql);
@@ -57,13 +63,18 @@ public static void main(String[] args) {
5763

5864
ExecuteSql.exeSql(sqlCommandCallList, tEnv, statementSet);
5965

66+
67+
6068
TableResult tableResult = statementSet.execute();
6169
if (tableResult == null || tableResult.getJobClient().get() == null ||
6270
tableResult.getJobClient().get().getJobID() == null) {
6371
throw new RuntimeException("任务运行失败 没有获取到JobID");
6472
}
65-
System.out.println("任务提交成功 jobId=" + tableResult.getJobClient().get().getJobID());
66-
log.info("任务提交成功 jobId={}", tableResult.getJobClient().get().getJobID());
73+
JobID jobID=tableResult.getJobClient().get().getJobID();
74+
75+
System.out.println(SystemConstant.QUERY_JOBID_KEY_WORD + jobID);
76+
77+
log.info(SystemConstant.QUERY_JOBID_KEY_WORD + "{}",jobID);
6778

6879
} catch (Exception e) {
6980
System.err.println("任务执行失败:" + e.getMessage());

flink-streaming-core/src/main/java/com/flink/streaming/core/execute/ExecuteSql.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public static void exeSql(List<SqlCommandCall> sqlCommandCallList, TableEnvironm
3838
statementSet.addInsertSql(sqlCommandCall.operands[0]);
3939
break;
4040
//显示语句
41+
case SELECT:
4142
case SHOW_CATALOGS:
4243
case SHOW_DATABASES:
4344
case SHOW_MODULES:

flink-streaming-core/src/main/java/com/flink/streaming/core/logs/LogPrint.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.flink.streaming.core.logs;
22

3+
import com.flink.streaming.common.enums.SqlCommand;
34
import com.flink.streaming.common.model.SqlCommandCall;
45
import lombok.extern.slf4j.Slf4j;
56
import org.apache.flink.table.api.TableEnvironment;
@@ -43,7 +44,12 @@ public static void queryRestPrint(TableEnvironment tEnv, SqlCommandCall sqlComma
4344
}
4445
LogPrint.logPrint(sqlCommandCall);
4546

46-
tEnv.executeSql(sqlCommandCall.operands[0]).print();
47+
48+
if (sqlCommandCall.getSqlCommand().name().equalsIgnoreCase(SqlCommand.SELECT.name())) {
49+
throw new RuntimeException("目前不支持select 语法使用");
50+
} else {
51+
tEnv.executeSql(sqlCommandCall.operands[0]).print();
52+
}
4753

4854
// if (sqlCommandCall.getSqlCommand().name().equalsIgnoreCase(SqlCommand.SELECT.name())) {
4955
// Iterator<Row> it = tEnv.executeSql(sqlCommandCall.operands[0]).collect();
@@ -53,4 +59,5 @@ public static void queryRestPrint(TableEnvironment tEnv, SqlCommandCall sqlComma
5359
// }
5460
// }
5561
}
62+
5663
}

flink-streaming-validation/src/main/java/com/flink/streaming/sql/validation/SqlValidation.java

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,35 @@
44
import com.flink.streaming.common.model.SqlCommandCall;
55
import com.flink.streaming.common.sql.SqlFileParser;
66
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.calcite.config.Lex;
8+
import org.apache.calcite.sql.parser.SqlParser;
9+
import org.apache.calcite.sql.validate.SqlConformance;
710
import org.apache.commons.lang3.StringUtils;
8-
import org.apache.flink.configuration.Configuration;
11+
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
912
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10-
import org.apache.flink.table.api.EnvironmentSettings;
11-
import org.apache.flink.table.api.SqlDialect;
12-
import org.apache.flink.table.api.StatementSet;
13-
import org.apache.flink.table.api.TableEnvironment;
13+
import org.apache.flink.table.api.*;
1414
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
1515
import org.apache.flink.table.api.config.TableConfigOptions;
16+
import org.apache.flink.table.planner.calcite.CalciteConfig;
17+
import org.apache.flink.table.planner.calcite.CalciteParser;
18+
import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories;
19+
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
20+
import org.apache.flink.table.planner.utils.TableConfigUtils;
1621

1722
import java.util.Arrays;
1823
import java.util.Collections;
1924
import java.util.List;
2025

21-
/**
22-
* @author zhuhuipei
23-
* @Description:
24-
* @date 2021/1/17
25-
* @time 10:51
26-
*/
26+
2727
@Slf4j
2828
public class SqlValidation {
2929

30-
3130
//TODO 暂时没有找到好的解决方案
32-
3331
/**
34-
* sql校验
3532
*
3633
* @author zhuhuipei
37-
* @date 2021/1/21
38-
* @time 22:24
34+
* @date 2021/3/27
35+
* @time 10:10
3936
*/
4037
public static void preCheckSql(List<String> sql) {
4138

@@ -49,9 +46,7 @@ public static void preCheckSql(List<String> sql) {
4946
TableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
5047

5148
List<SqlCommandCall> sqlCommandCallList = SqlFileParser.fileToSql(sql);
52-
53-
StatementSet statementSet = tEnv.createStatementSet();
54-
49+
TableConfig config = tEnv.getConfig();
5550
String value = null;
5651

5752
try {
@@ -60,46 +55,65 @@ public static void preCheckSql(List<String> sql) {
6055
value = sqlCommandCall.operands[0];
6156

6257
switch (sqlCommandCall.sqlCommand) {
63-
case USE_CATALOG:
64-
case CREATE_CATALOG:
65-
throw new RuntimeException("暂时不支持 CATALOG 相关语法校验(请不要点击 sql预校验 按钮了)");
66-
//配置
58+
//配置
6759
case SET:
6860
String key = sqlCommandCall.operands[0];
6961
String val = sqlCommandCall.operands[1];
70-
62+
if (val.contains(SystemConstant.LINE_FEED)){
63+
throw new RuntimeException("set 语法值异常:"+val) ;
64+
}
7165
if (TableConfigOptions.TABLE_SQL_DIALECT.key().equalsIgnoreCase(key.trim())
72-
&& SqlDialect.HIVE.name().equalsIgnoreCase(val.trim())) {
73-
throw new RuntimeException("暂时不支持 Hive相关语法校验 (请不要点击 sql预校验 按钮)");
66+
&& SqlDialect.HIVE.name().equalsIgnoreCase(val.trim())) {
67+
config.setSqlDialect(SqlDialect.HIVE);
68+
}else{
69+
config.setSqlDialect(SqlDialect.DEFAULT);
7470
}
75-
Configuration configuration = tEnv.getConfig().getConfiguration();
76-
configuration.setString(key, val);
77-
break;
78-
//insert 语句
79-
case INSERT_INTO:
80-
case INSERT_OVERWRITE:
81-
statementSet.addInsertSql(sqlCommandCall.operands[0]);
71+
8272
break;
8373
//其他
8474
default:
85-
tEnv.executeSql(sqlCommandCall.operands[0]);
75+
CalciteParser parser = new CalciteParser(getSqlParserConfig(config));
76+
parser.parse(sqlCommandCall.operands[0]);
8677
break;
8778
}
8879
}
8980
} catch (Exception e) {
90-
log.warn("语法异常:{} 原因 {}", value, e);
91-
throw new RuntimeException("语法异常 " + value + " 原因 " + e.getMessage());
81+
log.warn("语法异常: sql={} 原因是: {}", value, e);
82+
throw new RuntimeException("语法异常 sql=" + value + " 原因: " + e.getMessage());
9283
}
9384

9485
}
9586

87+
private static SqlParser.Config getSqlParserConfig(TableConfig tableConfig) {
88+
return JavaScalaConversionUtil.toJava(getCalciteConfig(tableConfig).getSqlParserConfig()).orElseGet(
89+
() -> {
90+
SqlConformance conformance = getSqlConformance(tableConfig.getSqlDialect());
91+
return SqlParser
92+
.config()
93+
.withParserFactory(FlinkSqlParserFactories.create(conformance))
94+
.withConformance(conformance)
95+
.withLex(Lex.JAVA)
96+
.withIdentifierMaxLength(256);
97+
}
98+
);
99+
}
96100

101+
private static CalciteConfig getCalciteConfig(TableConfig tableConfig) {
102+
return TableConfigUtils.getCalciteConfig(tableConfig);
103+
}
104+
private static FlinkSqlConformance getSqlConformance(SqlDialect sqlDialect) {
105+
switch (sqlDialect) {
106+
case HIVE:
107+
return FlinkSqlConformance.HIVE;
108+
case DEFAULT:
109+
return FlinkSqlConformance.DEFAULT;
110+
default:
111+
throw new TableException("Unsupported SQL dialect: " + sqlDialect);
112+
}
113+
}
97114
/**
98115
* 字符串转sql
99116
*
100-
* @author zhuhuipei
101-
* @date 2021/1/22
102-
* @time 22:45
103117
*/
104118
public static List<String> toSqlList(String sql) {
105119
if (StringUtils.isEmpty(sql)) {

flink-streaming-web/src/main/java/com/flink/streaming/web/adapter/CommandAdapter.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.flink.streaming.web.adapter;
22

3+
import com.flink.streaming.web.enums.DeployModeEnum;
4+
35
/**
46
* @author zhuhuipei
57
* @Description:
@@ -8,24 +10,16 @@
810
*/
911
public interface CommandAdapter {
1012

11-
/**
12-
* yarn per模式启动任务
13-
*
14-
* @author zhuhuipei
15-
* @date 2020-09-18
16-
* @time 20:18
17-
*/
18-
void startForPerYarn(String command, StringBuilder localLog, Long jobRunLogId) throws Exception;
19-
2013

2114
/**
22-
* 启动本地模式
15+
* 启动服务
2316
*
2417
* @author zhuhuipei
25-
* @date 2020/11/1
26-
* @time 10:15
18+
* @date 2021/3/26
19+
* @time 17:31
2720
*/
28-
String startForLocal(String command, StringBuilder localLog, Long jobRunLogId) throws Exception;
21+
String submitJob(String command, StringBuilder localLog, Long jobRunLogId, DeployModeEnum deployModeEnum) throws Exception;
22+
2923

3024
/**
3125
* yarn per模式执行savepoint

0 commit comments

Comments
 (0)