Skip to content

Commit d832250

Browse files
committed
[not null][flinkStreamSQL支持not null 语法][17872]
1 parent 05329f8 commit d832250

File tree

6 files changed

+38
-19
lines changed

6 files changed

+38
-19
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@
7575
import org.slf4j.LoggerFactory;
7676

7777
import java.io.File;
78-
import java.io.IOException;
7978
import java.lang.reflect.Field;
8079
import java.lang.reflect.InvocationTargetException;
80+
import java.lang.reflect.Method;
8181
import java.net.URL;
8282
import java.net.URLClassLoader;
8383
import java.net.URLDecoder;
@@ -328,15 +328,20 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
328328
}
329329
}
330330

331-
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException, NoSuchMethodException {
331+
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
332332
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
333333
StreamExecutionEnvironment.getExecutionEnvironment() :
334334
new MyLocalStreamEnvironment();
335335
env.getConfig().disableClosureCleaner();
336336
env.setParallelism(FlinkUtil.getEnvParallelism(confProperties));
337337

338338
Configuration globalJobParameters = new Configuration();
339-
globalJobParameters.addAllToProperties(confProperties);
339+
//Configuration unsupported set properties key-value
340+
Method method = Configuration.class.getDeclaredMethod("setValueInternal", String.class, Object.class);
341+
method.setAccessible(true);
342+
for (Map.Entry<Object, Object> prop : confProperties.entrySet()) {
343+
method.invoke(globalJobParameters, prop.getKey(), prop.getValue());
344+
}
340345

341346
ExecutionConfig exeConfig = env.getConfig();
342347
if(exeConfig.getGlobalJobParameters() == null){

core/src/main/java/com/dtstack/flink/sql/table/AbsSourceParser.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package com.dtstack.flink.sql.table;
2222

23+
import com.dtstack.flink.sql.util.ClassUtil;
2324
import com.dtstack.flink.sql.util.MathUtil;
2425

2526
import java.util.regex.Matcher;
@@ -36,19 +37,21 @@
3637
public abstract class AbsSourceParser extends AbsTableParser {
3738

3839
private static final String VIRTUAL_KEY = "virtualFieldKey";
39-
4040
private static final String WATERMARK_KEY = "waterMarkKey";
41+
private static final String NOTNULL_KEY = "notNullKey";
4142

4243
private static Pattern virtualFieldKeyPattern = Pattern.compile("(?i)^(\\S+\\([^\\)]+\\))\\s+AS\\s+(\\w+)$");
43-
4444
private static Pattern waterMarkKeyPattern = Pattern.compile("(?i)^\\s*WATERMARK\\s+FOR\\s+(\\S+)\\s+AS\\s+withOffset\\(\\s*(\\S+)\\s*,\\s*(\\d+)\\s*\\)$");
45+
private static Pattern notNullKeyPattern = Pattern.compile("(?i)(\\w+)\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4546

4647
static {
4748
keyPatternMap.put(VIRTUAL_KEY, virtualFieldKeyPattern);
4849
keyPatternMap.put(WATERMARK_KEY, waterMarkKeyPattern);
50+
keyPatternMap.put(NOTNULL_KEY, notNullKeyPattern);
4951

5052
keyHandlerMap.put(VIRTUAL_KEY, AbsSourceParser::dealVirtualField);
5153
keyHandlerMap.put(WATERMARK_KEY, AbsSourceParser::dealWaterMark);
54+
keyHandlerMap.put(NOTNULL_KEY, AbsSourceParser::dealNotNull);
5255
}
5356

5457
static void dealVirtualField(Matcher matcher, TableInfo tableInfo){
@@ -66,4 +69,19 @@ static void dealWaterMark(Matcher matcher, TableInfo tableInfo){
6669
sourceTableInfo.setEventTimeField(eventTimeField);
6770
sourceTableInfo.setMaxOutOrderness(offset);
6871
}
72+
73+
static void dealNotNull(Matcher matcher, TableInfo tableInfo) {
74+
String fieldName = matcher.group(1);
75+
String fieldType = matcher.group(2);
76+
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
77+
boolean notNull = matcher.group(3) != null;
78+
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
79+
fieldExtraInfo.setNotNull(notNull);
80+
81+
tableInfo.addPhysicalMappings(fieldName, fieldName);
82+
tableInfo.addField(fieldName);
83+
tableInfo.addFieldClass(fieldClass);
84+
tableInfo.addFieldType(fieldType);
85+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
86+
}
6987
}

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public static class FieldExtraInfo implements Serializable {
189189
/**
190190
* default false:allow field is null
191191
*/
192-
boolean notNull;
192+
boolean notNull = false;
193193

194194
public boolean getNotNull() {
195195
return notNull;

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
package com.dtstack.flink.sql.source.kafka.table;
2222

2323
import com.dtstack.flink.sql.table.AbsSourceParser;
24-
import com.dtstack.flink.sql.table.SourceTableInfo;
2524
import com.dtstack.flink.sql.table.TableInfo;
2625
import com.dtstack.flink.sql.util.ClassUtil;
2726
import com.dtstack.flink.sql.util.MathUtil;
@@ -45,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4544

4645
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4746

48-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)(\\s+AS\\s+(\\w+))?(\\s+NOT\\s+NULL)?$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4948

5049
static {
5150
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
@@ -56,10 +55,9 @@ public class KafkaSourceParser extends AbsSourceParser {
5655
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
5756
String physicalField = matcher.group(1);
5857
String fieldType = matcher.group(3);
59-
String mappingField = matcher.group(5);
60-
mappingField = mappingField != null ? mappingField : physicalField;
58+
String mappingField = matcher.group(4);
6159
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
62-
boolean notNull = matcher.group(6) != null;
60+
boolean notNull = matcher.group(5) != null;
6361
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
6462
fieldExtraInfo.setNotNull(notNull);
6563

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4444

4545
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4646

47-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)(\\s+AS\\s+(\\w+))?(\\s+NOT\\s+NULL)?$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848

4949
static {
5050
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
@@ -60,10 +60,9 @@ public class KafkaSourceParser extends AbsSourceParser {
6060
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
6161
String physicalField = matcher.group(1);
6262
String fieldType = matcher.group(3);
63-
String mappingField = matcher.group(5);
64-
mappingField = mappingField != null ? mappingField : physicalField;
63+
String mappingField = matcher.group(4);
6564
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
66-
boolean notNull = matcher.group(6) != null;
65+
boolean notNull = matcher.group(5) != null;
6766
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
6867
fieldExtraInfo.setNotNull(notNull);
6968

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4444

4545
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4646

47-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)(\\s+AS\\s+(\\w+))?(\\s+NOT\\s+NULL)?$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
4848

4949
static {
5050
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
@@ -60,10 +60,9 @@ public class KafkaSourceParser extends AbsSourceParser {
6060
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
6161
String physicalField = matcher.group(1);
6262
String fieldType = matcher.group(3);
63-
String mappingField = matcher.group(5);
64-
mappingField = mappingField != null ? mappingField : physicalField;
63+
String mappingField = matcher.group(4);
6564
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
66-
boolean notNull = matcher.group(6) != null;
65+
boolean notNull = matcher.group(5) != null;
6766
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
6867
fieldExtraInfo.setNotNull(notNull);
6968

0 commit comments

Comments
 (0)