Skip to content

Commit 1c7f9cf

Browse files
committed
[kafka not null][flinkStreamSQL支持not null 语法][17872]
1 parent 9072007 commit 1c7f9cf

File tree

6 files changed

+33
-33
lines changed

6 files changed

+33
-33
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import org.apache.flink.api.common.typeinfo.TypeInformation;
2728
import org.apache.flink.api.common.typeinfo.Types;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -49,6 +50,7 @@
4950
import java.sql.Time;
5051
import java.sql.Timestamp;
5152
import java.util.Iterator;
53+
import java.util.List;
5254
import java.util.Map;
5355
import java.util.Set;
5456

@@ -80,9 +82,6 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
8082
/** Types to parse fields as. Indices match fieldNames indices. */
8183
private final TypeInformation<?>[] fieldTypes;
8284

83-
/** Flag indicating whether to fail on a missing field. */
84-
private boolean failOnMissingField;
85-
8685
private AbstractFetcher<Row, ?> fetcher;
8786

8887
private boolean firstMsg = true;
@@ -91,15 +90,14 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
9190

9291
private Map<String, String> rowAndFieldMapping;
9392

93+
private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
9494

95-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
95+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos){
9696
this.typeInfo = typeInfo;
97-
9897
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
99-
10098
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
101-
10299
this.rowAndFieldMapping= rowAndFieldMapping;
100+
this.fieldExtraInfos = fieldExtraInfos;
103101
}
104102

105103
@Override
@@ -129,9 +127,10 @@ public Row deserialize(byte[] message) throws IOException {
129127

130128
for (int i = 0; i < fieldNames.length; i++) {
131129
JsonNode node = getIgnoreCase(fieldNames[i]);
130+
TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
132131

133132
if (node == null) {
134-
if (failOnMissingField) {
133+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
135134
throw new IllegalStateException("Failed to find field with name '"
136135
+ fieldNames[i] + "'.");
137136
} else {
@@ -159,10 +158,6 @@ public Row deserialize(byte[] message) throws IOException {
159158
}
160159
}
161160

162-
public void setFailOnMissingField(boolean failOnMissingField) {
163-
this.failOnMissingField = failOnMissingField;
164-
}
165-
166161
private JsonNode getIgnoreCase(String key) {
167162
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
168163
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8787
FlinkKafkaConsumer09<Row> kafkaSrc;
8888
if (BooleanUtils.isTrue(kafka09SourceTableInfo.getTopicIsPattern())) {
8989
kafkaSrc = new CustomerKafka09Consumer(Pattern.compile(topicName),
90-
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields()), props);
90+
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields(), kafka09SourceTableInfo.getFieldExtraInfoList()), props);
9191
} else {
9292
kafkaSrc = new CustomerKafka09Consumer(topicName,
93-
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields()), props);
93+
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields(), kafka09SourceTableInfo.getFieldExtraInfoList()), props);
9494
}
9595

9696
//earliest,latest

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4545

4646
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4747

48-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
48+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)(\\s+AS\\s+(\\w+))?(\\s+NOT\\s+NULL)?$");
4949

5050
static {
5151
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
@@ -56,13 +56,18 @@ public class KafkaSourceParser extends AbsSourceParser {
5656
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
5757
String physicalField = matcher.group(1);
5858
String fieldType = matcher.group(3);
59-
String mappingField = matcher.group(4);
59+
String mappingField = matcher.group(5);
60+
mappingField = mappingField != null ? mappingField : physicalField;
6061
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
62+
boolean notNull = matcher.group(6) != null;
63+
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
64+
fieldExtraInfo.setNotNull(notNull);
6165

6266
tableInfo.addPhysicalMappings(mappingField, physicalField);
6367
tableInfo.addField(mappingField);
6468
tableInfo.addFieldClass(fieldClass);
6569
tableInfo.addFieldType(fieldType);
70+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
6671
if(LOG.isInfoEnabled()){
6772
LOG.info(physicalField + "--->" + mappingField + " Class: " + fieldClass.toString());
6873
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import com.dtstack.flink.sql.source.AbsDeserialization;
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
26+
import com.dtstack.flink.sql.table.TableInfo;
2627
import org.apache.flink.api.common.typeinfo.TypeInformation;
2728
import org.apache.flink.api.common.typeinfo.Types;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -49,6 +50,7 @@
4950
import java.sql.Time;
5051
import java.sql.Timestamp;
5152
import java.util.Iterator;
53+
import java.util.List;
5254
import java.util.Map;
5355
import java.util.Set;
5456

@@ -80,9 +82,6 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
8082
/** Types to parse fields as. Indices match fieldNames indices. */
8183
private final TypeInformation<?>[] fieldTypes;
8284

83-
/** Flag indicating whether to fail on a missing field. */
84-
private boolean failOnMissingField;
85-
8685
private AbstractFetcher<Row, ?> fetcher;
8786

8887
private boolean firstMsg = true;
@@ -91,14 +90,14 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
9190

9291
private Map<String, String> rowAndFieldMapping;
9392

94-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
95-
this.typeInfo = typeInfo;
93+
private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
9694

95+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos){
96+
this.typeInfo = typeInfo;
9797
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
98-
9998
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
100-
10199
this.rowAndFieldMapping= rowAndFieldMapping;
100+
this.fieldExtraInfos = fieldExtraInfos;
102101
}
103102

104103
@Override
@@ -129,9 +128,10 @@ public Row deserialize(byte[] message) throws IOException {
129128

130129
for (int i = 0; i < fieldNames.length; i++) {
131130
JsonNode node = getIgnoreCase(fieldNames[i]);
131+
TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
132132

133133
if (node == null) {
134-
if (failOnMissingField) {
134+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
135135
throw new IllegalStateException("Failed to find field with name '"
136136
+ fieldNames[i] + "'.");
137137
} else {
@@ -176,11 +176,6 @@ public JsonNode getIgnoreCase(String key) {
176176
return node;
177177
}
178178

179-
180-
public void setFailOnMissingField(boolean failOnMissingField) {
181-
this.failOnMissingField = failOnMissingField;
182-
}
183-
184179
private void parseTree(JsonNode jsonNode, String prefix){
185180

186181
Iterator<String> iterator = jsonNode.fieldNames();

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8989
FlinkKafkaConsumer010<Row> kafkaSrc;
9090
if (BooleanUtils.isTrue(kafka010SourceTableInfo.getTopicIsPattern())) {
9191
kafkaSrc = new CustomerKafka010Consumer(Pattern.compile(topicName),
92-
new CustomerJsonDeserialization(typeInformation, kafka010SourceTableInfo.getPhysicalFields()), props);
92+
new CustomerJsonDeserialization(typeInformation, kafka010SourceTableInfo.getPhysicalFields(), kafka010SourceTableInfo.getFieldExtraInfoList()), props);
9393
} else {
9494
kafkaSrc = new CustomerKafka010Consumer(topicName,
95-
new CustomerJsonDeserialization(typeInformation, kafka010SourceTableInfo.getPhysicalFields()), props);
95+
new CustomerJsonDeserialization(typeInformation, kafka010SourceTableInfo.getPhysicalFields(), kafka010SourceTableInfo.getFieldExtraInfoList()), props);
9696
}
9797

9898
//earliest,latest

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class KafkaSourceParser extends AbsSourceParser {
4444

4545
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
4646

47-
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
47+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((@*\\w+\\.)*\\w+)\\s+(\\w+)(\\s+AS\\s+(\\w+))?(\\s+NOT\\s+NULL)?$");
4848

4949
static {
5050
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
@@ -60,13 +60,18 @@ public class KafkaSourceParser extends AbsSourceParser {
6060
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
6161
String physicalField = matcher.group(1);
6262
String fieldType = matcher.group(3);
63-
String mappingField = matcher.group(4);
63+
String mappingField = matcher.group(5);
64+
mappingField = mappingField != null ? mappingField : physicalField;
6465
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
66+
boolean notNull = matcher.group(6) != null;
67+
TableInfo.FieldExtraInfo fieldExtraInfo = new TableInfo.FieldExtraInfo();
68+
fieldExtraInfo.setNotNull(notNull);
6569

6670
tableInfo.addPhysicalMappings(mappingField, physicalField);
6771
tableInfo.addField(mappingField);
6872
tableInfo.addFieldClass(fieldClass);
6973
tableInfo.addFieldType(fieldType);
74+
tableInfo.addFieldExtraInfo(fieldExtraInfo);
7075
if(LOG.isInfoEnabled()){
7176
LOG.info(physicalField + "--->" + mappingField + " Class: " + fieldClass.toString());
7277
}

0 commit comments

Comments
 (0)