Skip to content

Commit b971db1

Browse files
committed
添加对解析json嵌套结构的支持: 列格式 a.b int as newfield
1 parent 0334320 commit b971db1

File tree

10 files changed

+173
-15
lines changed

10 files changed

+173
-15
lines changed

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
24+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2425

2526
import java.io.Serializable;
2627
import java.util.List;
28+
import java.util.Map;
2729

2830
/**
2931
* Reason:
@@ -48,6 +50,9 @@ public abstract class TableInfo implements Serializable {
4850

4951
private final List<String> fieldList = Lists.newArrayList();
5052

53+
/**key:别名, value: realField */
54+
private Map<String, String> physicalFields = Maps.newHashMap();
55+
5156
private final List<String> fieldTypeList = Lists.newArrayList();
5257

5358
private final List<Class> fieldClassList = Lists.newArrayList();
@@ -114,6 +119,10 @@ public void addField(String fieldName){
114119
fieldList.add(fieldName);
115120
}
116121

122+
public void addPhysicalMappings(String aliasName, String physicalFieldName){
123+
physicalFields.put(aliasName, physicalFieldName);
124+
}
125+
117126
public void addFieldClass(Class fieldClass){
118127
fieldClassList.add(fieldClass);
119128
}
@@ -146,6 +155,14 @@ public List<Class> getFieldClassList() {
146155
return fieldClassList;
147156
}
148157

158+
public Map<String, String> getPhysicalFields() {
159+
return physicalFields;
160+
}
161+
162+
public void setPhysicalFields(Map<String, String> physicalFields) {
163+
this.physicalFields = physicalFields;
164+
}
165+
149166
public void finish(){
150167
this.fields = fieldList.toArray(new String[fieldList.size()]);
151168
this.fieldClasses = fieldClassList.toArray(new Class[fieldClassList.size()]);

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
3131
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3334
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3435
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3536
import org.apache.flink.types.Row;
@@ -80,13 +81,17 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
8081

8182
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
8283

84+
private Map<String, String> rowAndFieldMapping;
8385

84-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
86+
87+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
8588
this.typeInfo = typeInfo;
8689

8790
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
8891

8992
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
93+
94+
this.rowAndFieldMapping= rowAndFieldMapping;
9095
}
9196

9297
@Override
@@ -111,7 +116,7 @@ public Row deserialize(byte[] message) throws IOException {
111116
Row row = new Row(fieldNames.length);
112117

113118
for (int i = 0; i < fieldNames.length; i++) {
114-
JsonNode node = nodeAndJsonNodeMapping.get(fieldNames[i]);
119+
JsonNode node = getIgnoreCase(fieldNames[i]);
115120

116121
if (node == null) {
117122
if (failOnMissingField) {
@@ -133,16 +138,29 @@ public Row deserialize(byte[] message) throws IOException {
133138
//add metric of dirty data
134139
dirtyDataCounter.inc();
135140
return null;
141+
}finally {
142+
nodeAndJsonNodeMapping.clear();
136143
}
137144
}
138145

139146
public void setFailOnMissingField(boolean failOnMissingField) {
140147
this.failOnMissingField = failOnMissingField;
141148
}
142149

150+
private JsonNode getIgnoreCase(String key) {
151+
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
152+
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
153+
JsonNodeType nodeType = node.getNodeType();
154+
155+
if (nodeType == JsonNodeType.ARRAY){
156+
throw new IllegalStateException("Unsupported type information array .") ;
157+
}
158+
159+
return node;
160+
}
161+
143162

144163
private void parseTree(JsonNode jsonNode, String prefix){
145-
nodeAndJsonNodeMapping.clear();
146164

147165
Iterator<String> iterator = jsonNode.fieldNames();
148166
while (iterator.hasNext()){

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8484
FlinkKafkaConsumer09<Row> kafkaSrc;
8585
if (BooleanUtils.isTrue(kafka09SourceTableInfo.getTopicIsPattern())) {
8686
kafkaSrc = new CustomerKafka09Consumer(Pattern.compile(topicName),
87-
new CustomerJsonDeserialization(typeInformation), props);
87+
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields()), props);
8888
} else {
8989
kafkaSrc = new CustomerKafka09Consumer(topicName,
90-
new CustomerJsonDeserialization(typeInformation), props);
90+
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields()), props);
9191
}
9292

9393
//earliest,latest

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
package com.dtstack.flink.sql.source.kafka.table;
2222

2323
import com.dtstack.flink.sql.table.AbsSourceParser;
24+
import com.dtstack.flink.sql.table.SourceTableInfo;
2425
import com.dtstack.flink.sql.table.TableInfo;
26+
import com.dtstack.flink.sql.util.ClassUtil;
2527
import com.dtstack.flink.sql.util.MathUtil;
2628

2729
import java.util.Map;
30+
import java.util.regex.Matcher;
31+
import java.util.regex.Pattern;
2832

2933
/**
3034
* Reason:
@@ -35,6 +39,28 @@
3539

3640
public class KafkaSourceParser extends AbsSourceParser {
3741

42+
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
43+
44+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
45+
46+
static {
47+
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
48+
49+
keyHandlerMap.put(KAFKA_NEST_FIELD_KEY, KafkaSourceParser::dealNestField);
50+
}
51+
52+
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
53+
String physicalField = matcher.group(1);
54+
String fieldType = matcher.group(3);
55+
String mappingField = matcher.group(4);
56+
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
57+
58+
tableInfo.addPhysicalMappings(mappingField, physicalField);
59+
tableInfo.addField(mappingField);
60+
tableInfo.addFieldClass(fieldClass);
61+
tableInfo.addFieldType(fieldType);
62+
}
63+
3864
@Override
3965
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
4066

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
3131
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3334
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3435
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3536
import org.apache.flink.types.Row;
@@ -80,13 +81,16 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
8081

8182
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
8283

84+
private Map<String, String> rowAndFieldMapping;
8385

84-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
86+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
8587
this.typeInfo = typeInfo;
8688

8789
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
8890

8991
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
92+
93+
this.rowAndFieldMapping= rowAndFieldMapping;
9094
}
9195

9296
@Override
@@ -111,7 +115,7 @@ public Row deserialize(byte[] message) throws IOException {
111115
Row row = new Row(fieldNames.length);
112116

113117
for (int i = 0; i < fieldNames.length; i++) {
114-
JsonNode node = nodeAndJsonNodeMapping.get(fieldNames[i]);
118+
JsonNode node = getIgnoreCase(fieldNames[i]);
115119

116120
if (node == null) {
117121
if (failOnMissingField) {
@@ -133,15 +137,29 @@ public Row deserialize(byte[] message) throws IOException {
133137
//add metric of dirty data
134138
dirtyDataCounter.inc();
135139
return null;
140+
}finally {
141+
nodeAndJsonNodeMapping.clear();
136142
}
137143
}
138144

145+
public JsonNode getIgnoreCase(String key) {
146+
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
147+
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
148+
JsonNodeType nodeType = node.getNodeType();
149+
150+
if (nodeType==JsonNodeType.ARRAY){
151+
throw new IllegalStateException("Unsupported type information array .") ;
152+
}
153+
154+
return node;
155+
}
156+
157+
139158
public void setFailOnMissingField(boolean failOnMissingField) {
140159
this.failOnMissingField = failOnMissingField;
141160
}
142161

143162
private void parseTree(JsonNode jsonNode, String prefix){
144-
nodeAndJsonNodeMapping.clear();
145163

146164
Iterator<String> iterator = jsonNode.fieldNames();
147165
while (iterator.hasNext()){

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8585
FlinkKafkaConsumer010<Row> kafkaSrc;
8686
if (BooleanUtils.isTrue(kafka010SourceTableInfo.getTopicIsPattern())) {
8787
kafkaSrc = new CustomerKafka010Consumer(Pattern.compile(topicName),
88-
new CustomerJsonDeserialization(typeInformation), props);
88+
new CustomerJsonDeserialization(typeInformation, kafka010SourceTableInfo.getPhysicalFields()), props);
8989
} else {
9090
kafkaSrc = new CustomerKafka010Consumer(topicName,
91-
new CustomerJsonDeserialization(typeInformation), props);
91+
new CustomerJsonDeserialization(typeInformation, kafka010SourceTableInfo.getPhysicalFields()), props);
9292
}
9393

9494
//earliest,latest

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222

2323
import com.dtstack.flink.sql.table.AbsSourceParser;
2424
import com.dtstack.flink.sql.table.TableInfo;
25+
import com.dtstack.flink.sql.util.ClassUtil;
2526
import com.dtstack.flink.sql.util.MathUtil;
2627

2728
import java.util.Map;
29+
import java.util.regex.Matcher;
30+
import java.util.regex.Pattern;
2831

2932
/**
3033
* Reason:
@@ -35,6 +38,33 @@
3538

3639
public class KafkaSourceParser extends AbsSourceParser {
3740

41+
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
42+
43+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
44+
45+
static {
46+
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
47+
48+
keyHandlerMap.put(KAFKA_NEST_FIELD_KEY, KafkaSourceParser::dealNestField);
49+
}
50+
51+
/**
52+
* add parser for alias field
53+
* @param matcher
54+
* @param tableInfo
55+
*/
56+
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
57+
String physicalField = matcher.group(1);
58+
String fieldType = matcher.group(3);
59+
String mappingField = matcher.group(4);
60+
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
61+
62+
tableInfo.addPhysicalMappings(mappingField, physicalField);
63+
tableInfo.addField(mappingField);
64+
tableInfo.addFieldClass(fieldClass);
65+
tableInfo.addFieldType(fieldType);
66+
}
67+
3868
@Override
3969
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
4070

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
3131
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3334
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3435
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3536
import org.apache.flink.types.Row;
@@ -82,12 +83,17 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
8283

8384
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
8485

85-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
86+
private Map<String, String> rowAndFieldMapping;
87+
88+
89+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
8690
this.typeInfo = typeInfo;
8791

8892
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
8993

9094
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
95+
96+
this.rowAndFieldMapping= rowAndFieldMapping;
9197
}
9298

9399
@Override
@@ -112,7 +118,7 @@ public Row deserialize(byte[] message) throws IOException {
112118
Row row = new Row(fieldNames.length);
113119

114120
for (int i = 0; i < fieldNames.length; i++) {
115-
JsonNode node = nodeAndJsonNodeMapping.get(fieldNames[i]);
121+
JsonNode node = getIgnoreCase(fieldNames[i]);
116122

117123
if (node == null) {
118124
if (failOnMissingField) {
@@ -134,15 +140,28 @@ public Row deserialize(byte[] message) throws IOException {
134140
//add metric of dirty data
135141
dirtyDataCounter.inc();
136142
return null;
143+
}finally {
144+
nodeAndJsonNodeMapping.clear();
145+
}
146+
}
147+
148+
public JsonNode getIgnoreCase(String key) {
149+
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
150+
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
151+
JsonNodeType nodeType = node.getNodeType();
152+
153+
if (nodeType==JsonNodeType.ARRAY){
154+
throw new IllegalStateException("Unsupported type information array .") ;
137155
}
156+
157+
return node;
138158
}
139159

140160
public void setFailOnMissingField(boolean failOnMissingField) {
141161
this.failOnMissingField = failOnMissingField;
142162
}
143163

144164
private void parseTree(JsonNode jsonNode, String prefix){
145-
nodeAndJsonNodeMapping.clear();
146165

147166
Iterator<String> iterator = jsonNode.fieldNames();
148167
while (iterator.hasNext()){

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8585
FlinkKafkaConsumer011<Row> kafkaSrc;
8686
if (BooleanUtils.isTrue(kafka011SourceTableInfo.getTopicIsPattern())) {
8787
kafkaSrc = new CustomerKafka011Consumer(Pattern.compile(topicName),
88-
new CustomerJsonDeserialization(typeInformation), props);
88+
new CustomerJsonDeserialization(typeInformation, kafka011SourceTableInfo.getPhysicalFields()), props);
8989
} else {
9090
kafkaSrc = new CustomerKafka011Consumer(topicName,
91-
new CustomerJsonDeserialization(typeInformation), props);
91+
new CustomerJsonDeserialization(typeInformation, kafka011SourceTableInfo.getPhysicalFields()), props);
9292
}
9393

9494

0 commit comments

Comments
 (0)