Skip to content

Commit 0fae6be

Browse files
committed
Merge branch 'v1.8.0_dev' into v1.9.0_dev
2 parents e90cc37 + 95fa295 commit 0fae6be

File tree

5 files changed

+54
-62
lines changed

5 files changed

+54
-62
lines changed

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ public class CassandraSideParser extends AbsSideTableParser {
6868

6969
public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";
7070

71-
static {
72-
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
73-
keyHandlerMap.put(SIDE_SIGN_KEY, CassandraSideParser::dealSideSign);
71+
public CassandraSideParser() {
72+
addParserHandler(SIDE_SIGN_KEY, SIDE_TABLE_SIGN, this::dealSideSign);
7473
}
7574

7675
@Override
@@ -97,7 +96,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9796
return cassandraSideTableInfo;
9897
}
9998

100-
private static void dealSideSign(Matcher matcher, TableInfo tableInfo) {
99+
private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
101100
}
102101

103102
public Class dbTypeConvertToJavaType(String fieldType) {

core/src/main/java/com/dtstack/flink/sql/table/AbsSideTableParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,11 @@ public abstract class AbsSideTableParser extends AbsTableParser {
4040

4141
private final static Pattern SIDE_TABLE_SIGN = Pattern.compile("(?i)^PERIOD\\s+FOR\\s+SYSTEM_TIME$");
4242

43-
static {
44-
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
45-
keyHandlerMap.put(SIDE_SIGN_KEY, AbsSideTableParser::dealSideSign);
43+
public AbsSideTableParser() {
44+
addParserHandler(SIDE_SIGN_KEY, SIDE_TABLE_SIGN, this::dealSideSign);
4645
}
4746

48-
private static void dealSideSign(Matcher matcher, TableInfo tableInfo){
47+
private void dealSideSign(Matcher matcher, TableInfo tableInfo){
4948
//FIXME SIDE_TABLE_SIGN current just used as a sign for side table; and do nothing
5049
}
5150

core/src/main/java/com/dtstack/flink/sql/table/AbsSourceParser.java

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package com.dtstack.flink.sql.table;
2222

23-
import com.dtstack.flink.sql.util.ClassUtil;
2423
import com.dtstack.flink.sql.util.MathUtil;
2524

2625
import java.util.regex.Matcher;
@@ -39,33 +38,25 @@ public abstract class AbsSourceParser extends AbsTableParser {
3938
private static final String VIRTUAL_KEY = "virtualFieldKey";
4039
private static final String WATERMARK_KEY = "waterMarkKey";
4140
private static final String NOTNULL_KEY = "notNullKey";
42-
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
4341

44-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4542
private static Pattern virtualFieldKeyPattern = Pattern.compile("(?i)^(\\S+\\([^\\)]+\\))\\s+AS\\s+(\\w+)$");
4643
private static Pattern waterMarkKeyPattern = Pattern.compile("(?i)^\\s*WATERMARK\\s+FOR\\s+(\\S+)\\s+AS\\s+withOffset\\(\\s*(\\S+)\\s*,\\s*(\\d+)\\s*\\)$");
4744
private static Pattern notNullKeyPattern = Pattern.compile("(?i)^(\\w+)\\s+(\\w+)\\s+NOT\\s+NULL?$");
4845

49-
static {
50-
keyPatternMap.put(VIRTUAL_KEY, virtualFieldKeyPattern);
51-
keyPatternMap.put(WATERMARK_KEY, waterMarkKeyPattern);
52-
keyPatternMap.put(NOTNULL_KEY, notNullKeyPattern);
53-
keyPatternMap.put(NEST_JSON_FIELD_KEY, nestJsonFieldKeyPattern);
54-
55-
keyHandlerMap.put(VIRTUAL_KEY, AbsSourceParser::dealVirtualField);
56-
keyHandlerMap.put(WATERMARK_KEY, AbsSourceParser::dealWaterMark);
57-
keyHandlerMap.put(NOTNULL_KEY, AbsSourceParser::dealNotNull);
58-
keyHandlerMap.put(NEST_JSON_FIELD_KEY, AbsSourceParser::dealNestField);
46+
public AbsSourceParser() {
47+
addParserHandler(VIRTUAL_KEY, virtualFieldKeyPattern, this::dealVirtualField);
48+
addParserHandler(WATERMARK_KEY, waterMarkKeyPattern, this::dealWaterMark);
49+
addParserHandler(NOTNULL_KEY, notNullKeyPattern, this::dealNotNull);
5950
}
6051

61-
static void dealVirtualField(Matcher matcher, TableInfo tableInfo){
52+
protected void dealVirtualField(Matcher matcher, TableInfo tableInfo){
6253
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
6354
String fieldName = matcher.group(2);
6455
String expression = matcher.group(1);
6556
sourceTableInfo.addVirtualField(fieldName, expression);
6657
}
6758

68-
static void dealWaterMark(Matcher matcher, TableInfo tableInfo){
59+
protected void dealWaterMark(Matcher matcher, TableInfo tableInfo){
6960
SourceTableInfo sourceTableInfo = (SourceTableInfo) tableInfo;
7061
String eventTimeField = matcher.group(1);
7162
//FIXME Temporarily resolve the second parameter row_time_field
@@ -74,10 +65,10 @@ static void dealWaterMark(Matcher matcher, TableInfo tableInfo){
7465
sourceTableInfo.setMaxOutOrderness(offset);
7566
}
7667

77-
static void dealNotNull(Matcher matcher, TableInfo tableInfo) {
68+
protected void dealNotNull(Matcher matcher, TableInfo tableInfo) {
7869
String fieldName = matcher.group(1);
7970
String fieldType = matcher.group(2);
80-
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
71+
Class fieldClass= dbTypeConvertToJavaType(fieldType);
8172
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
8273
fieldExtraInfo.setNotNull(true);
8374

@@ -88,24 +79,4 @@ static void dealNotNull(Matcher matcher, TableInfo tableInfo) {
8879
tableInfo.addFieldExtraInfo(fieldExtraInfo);
8980
}
9081

91-
/**
92-
* add parser for alias field
93-
* @param matcher
94-
* @param tableInfo
95-
*/
96-
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
97-
String physicalField = matcher.group(1);
98-
String fieldType = matcher.group(3);
99-
String mappingField = matcher.group(4);
100-
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
101-
boolean notNull = matcher.group(5) != null;
102-
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
103-
fieldExtraInfo.setNotNull(notNull);
104-
105-
tableInfo.addPhysicalMappings(mappingField, physicalField);
106-
tableInfo.addField(mappingField);
107-
tableInfo.addFieldClass(fieldClass);
108-
tableInfo.addFieldType(fieldType);
109-
tableInfo.addFieldExtraInfo(fieldExtraInfo);
110-
}
11182
}

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.table;
2222

@@ -40,16 +40,18 @@
4040
public abstract class AbsTableParser {
4141

4242
private static final String PRIMARY_KEY = "primaryKey";
43+
private static final String NEST_JSON_FIELD_KEY = "nestFieldKey";
4344

4445
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
46+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4547

46-
public static Map<String, Pattern> keyPatternMap = Maps.newHashMap();
48+
private Map<String, Pattern> patternMap = Maps.newHashMap();
4749

48-
public static Map<String, ITableFieldDealHandler> keyHandlerMap = Maps.newHashMap();
50+
private Map<String, ITableFieldDealHandler> handlerMap = Maps.newHashMap();
4951

50-
static {
51-
keyPatternMap.put(PRIMARY_KEY, primaryKeyPattern);
52-
keyHandlerMap.put(PRIMARY_KEY, AbsTableParser::dealPrimaryKey);
52+
public AbsTableParser() {
53+
addParserHandler(PRIMARY_KEY, primaryKeyPattern, this::dealPrimaryKey);
54+
addParserHandler(NEST_JSON_FIELD_KEY, nestJsonFieldKeyPattern, this::dealNestField);
5355
}
5456

5557
protected boolean fieldNameNeedsUpperCase() {
@@ -59,12 +61,12 @@ protected boolean fieldNameNeedsUpperCase() {
5961
public abstract TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception;
6062

6163
public boolean dealKeyPattern(String fieldRow, TableInfo tableInfo){
62-
for(Map.Entry<String, Pattern> keyPattern : keyPatternMap.entrySet()){
64+
for(Map.Entry<String, Pattern> keyPattern : patternMap.entrySet()){
6365
Pattern pattern = keyPattern.getValue();
6466
String key = keyPattern.getKey();
6567
Matcher matcher = pattern.matcher(fieldRow);
6668
if(matcher.find()){
67-
ITableFieldDealHandler handler = keyHandlerMap.get(key);
69+
ITableFieldDealHandler handler = handlerMap.get(key);
6870
if(handler == null){
6971
throw new RuntimeException("parse field [" + fieldRow + "] error.");
7072
}
@@ -110,15 +112,40 @@ public void parseFieldsInfo(String fieldsInfo, TableInfo tableInfo){
110112
tableInfo.finish();
111113
}
112114

113-
public static void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
115+
public void dealPrimaryKey(Matcher matcher, TableInfo tableInfo){
114116
String primaryFields = matcher.group(1).trim();
115117
String[] splitArry = primaryFields.split(",");
116118
List<String> primaryKes = Lists.newArrayList(splitArry);
117119
tableInfo.setPrimaryKeys(primaryKes);
118120
}
119121

122+
/**
123+
* add parser for alias field
124+
* @param matcher
125+
* @param tableInfo
126+
*/
127+
protected void dealNestField(Matcher matcher, TableInfo tableInfo) {
128+
String physicalField = matcher.group(1);
129+
String fieldType = matcher.group(3);
130+
String mappingField = matcher.group(4);
131+
Class fieldClass= dbTypeConvertToJavaType(fieldType);
132+
boolean notNull = matcher.group(5) != null;
133+
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
134+
fieldExtraInfo.setNotNull(notNull);
135+
136+
tableInfo.addPhysicalMappings(mappingField, physicalField);
137+
tableInfo.addField(mappingField);
138+
tableInfo.addFieldClass(fieldClass);
139+
tableInfo.addFieldType(fieldType);
140+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
141+
}
142+
120143
public Class dbTypeConvertToJavaType(String fieldType) {
121144
return ClassUtil.stringConvertClass(fieldType);
122145
}
123146

147+
protected void addParserHandler(String parserName, Pattern pattern, ITableFieldDealHandler handler) {
148+
patternMap.put(parserName, pattern);
149+
handlerMap.put(parserName, handler);
150+
}
124151
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.dtstack.flink.sql.table.AbsSideTableParser;
2424
import com.dtstack.flink.sql.table.TableInfo;
25-
import com.dtstack.flink.sql.util.ClassUtil;
2625
import com.dtstack.flink.sql.util.MathUtil;
2726

2827
import java.util.Map;
@@ -54,13 +53,10 @@ public class HbaseSideParser extends AbsSideTableParser {
5453

5554
public static final String CACHE = "cache";
5655

57-
58-
static {
59-
keyPatternMap.put(FIELD_KEY, FIELD_PATTERN);
60-
keyHandlerMap.put(FIELD_KEY, HbaseSideParser::dealField);
56+
public HbaseSideParser() {
57+
addParserHandler(FIELD_KEY, FIELD_PATTERN, this::dealField);
6158
}
6259

63-
6460
@Override
6561
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
6662
HbaseSideTableInfo hbaseTableInfo = new HbaseSideTableInfo();
@@ -81,7 +77,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
8177
* @param matcher
8278
* @param tableInfo
8379
*/
84-
private static void dealField(Matcher matcher, TableInfo tableInfo){
80+
private void dealField(Matcher matcher, TableInfo tableInfo){
8581

8682
HbaseSideTableInfo sideTableInfo = (HbaseSideTableInfo) tableInfo;
8783
String filedDefineStr = matcher.group(1);
@@ -97,7 +93,7 @@ private static void dealField(Matcher matcher, TableInfo tableInfo){
9793
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
9894
String fieldName = String.join(" ", filedNameArr);
9995
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
100-
Class fieldClass = ClassUtil.stringConvertClass(filedInfoArr[1].trim());
96+
Class fieldClass = dbTypeConvertToJavaType(filedInfoArr[1].trim());
10197

10298
sideTableInfo.addColumnRealName(fieldName);
10399
sideTableInfo.addField(aliasStr);

0 commit comments

Comments
 (0)