Skip to content

Commit cfbcd1e

Browse files
committed
Merge branch '1.5_v3.5.0_nestingJson' into '1.5_v3.5.1'
flink1.5 sql 添加 对json 嵌套格式的支持 flink1.5 sql 添加 对json 嵌套格式的支持 See merge request !1
2 parents 23925a2 + b971db1 commit cfbcd1e

File tree

10 files changed

+248
-33
lines changed

10 files changed

+248
-33
lines changed

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
24+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2425

2526
import java.io.Serializable;
2627
import java.util.List;
28+
import java.util.Map;
2729

2830
/**
2931
* Reason:
@@ -48,6 +50,9 @@ public abstract class TableInfo implements Serializable {
4850

4951
private final List<String> fieldList = Lists.newArrayList();
5052

53+
/**key:别名, value: realField */
54+
private Map<String, String> physicalFields = Maps.newHashMap();
55+
5156
private final List<String> fieldTypeList = Lists.newArrayList();
5257

5358
private final List<Class> fieldClassList = Lists.newArrayList();
@@ -114,6 +119,10 @@ public void addField(String fieldName){
114119
fieldList.add(fieldName);
115120
}
116121

122+
public void addPhysicalMappings(String aliasName, String physicalFieldName){
123+
physicalFields.put(aliasName, physicalFieldName);
124+
}
125+
117126
public void addFieldClass(Class fieldClass){
118127
fieldClassList.add(fieldClass);
119128
}
@@ -146,6 +155,14 @@ public List<Class> getFieldClassList() {
146155
return fieldClassList;
147156
}
148157

158+
public Map<String, String> getPhysicalFields() {
159+
return physicalFields;
160+
}
161+
162+
public void setPhysicalFields(Map<String, String> physicalFields) {
163+
this.physicalFields = physicalFields;
164+
}
165+
149166
public void finish(){
150167
this.fields = fieldList.toArray(new String[fieldList.size()]);
151168
this.fieldClasses = fieldClassList.toArray(new Class[fieldClassList.size()]);

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2829
import org.apache.flink.metrics.MetricGroup;
30+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2931
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3032
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3134
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3235
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3336
import org.apache.flink.types.Row;
@@ -40,6 +43,7 @@
4043
import java.io.IOException;
4144
import java.lang.reflect.Field;
4245
import java.util.Iterator;
46+
import java.util.Map;
4347
import java.util.Set;
4448

4549
import static com.dtstack.flink.sql.metric.MetricConstant.*;
@@ -75,12 +79,19 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7579

7680
private boolean firstMsg = true;
7781

78-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
82+
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
83+
84+
private Map<String, String> rowAndFieldMapping;
85+
86+
87+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
7988
this.typeInfo = typeInfo;
8089

8190
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
8291

8392
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
93+
94+
this.rowAndFieldMapping= rowAndFieldMapping;
8495
}
8596

8697
@Override
@@ -101,9 +112,11 @@ public Row deserialize(byte[] message) throws IOException {
101112
numInBytes.inc(message.length);
102113

103114
JsonNode root = objectMapper.readTree(message);
115+
parseTree(root, null);
104116
Row row = new Row(fieldNames.length);
117+
105118
for (int i = 0; i < fieldNames.length; i++) {
106-
JsonNode node = getIgnoreCase(root, fieldNames[i]);
119+
JsonNode node = getIgnoreCase(fieldNames[i]);
107120

108121
if (node == null) {
109122
if (failOnMissingField) {
@@ -125,25 +138,50 @@ public Row deserialize(byte[] message) throws IOException {
125138
//add metric of dirty data
126139
dirtyDataCounter.inc();
127140
return null;
141+
}finally {
142+
nodeAndJsonNodeMapping.clear();
128143
}
129144
}
130145

131146
public void setFailOnMissingField(boolean failOnMissingField) {
132147
this.failOnMissingField = failOnMissingField;
133148
}
134149

135-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
150+
private JsonNode getIgnoreCase(String key) {
151+
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
152+
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
153+
JsonNodeType nodeType = node.getNodeType();
154+
155+
if (nodeType == JsonNodeType.ARRAY){
156+
throw new IllegalStateException("Unsupported type information array .") ;
157+
}
158+
159+
return node;
160+
}
161+
162+
163+
private void parseTree(JsonNode jsonNode, String prefix){
136164

137-
Iterator<String> iter = jsonNode.fieldNames();
138-
while (iter.hasNext()) {
139-
String key1 = iter.next();
140-
if (key1.equalsIgnoreCase(key)) {
141-
return jsonNode.get(key1);
165+
Iterator<String> iterator = jsonNode.fieldNames();
166+
while (iterator.hasNext()){
167+
String next = iterator.next();
168+
JsonNode child = jsonNode.get(next);
169+
String nodeKey = getNodeKey(prefix, next);
170+
171+
if (child.isValueNode()){
172+
nodeAndJsonNodeMapping.put(nodeKey, child);
173+
}else {
174+
parseTree(child, nodeKey);
142175
}
143176
}
177+
}
144178

145-
return null;
179+
private String getNodeKey(String prefix, String nodeName){
180+
if(Strings.isNullOrEmpty(prefix)){
181+
return nodeName;
182+
}
146183

184+
return prefix + "." + nodeName;
147185
}
148186

149187
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8484
FlinkKafkaConsumer09<Row> kafkaSrc;
8585
if (BooleanUtils.isTrue(kafka09SourceTableInfo.getTopicIsPattern())) {
8686
kafkaSrc = new CustomerKafka09Consumer(Pattern.compile(topicName),
87-
new CustomerJsonDeserialization(typeInformation), props);
87+
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields()), props);
8888
} else {
8989
kafkaSrc = new CustomerKafka09Consumer(topicName,
90-
new CustomerJsonDeserialization(typeInformation), props);
90+
new CustomerJsonDeserialization(typeInformation, kafka09SourceTableInfo.getPhysicalFields()), props);
9191
}
9292

9393
//earliest,latest

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
package com.dtstack.flink.sql.source.kafka.table;
2222

2323
import com.dtstack.flink.sql.table.AbsSourceParser;
24+
import com.dtstack.flink.sql.table.SourceTableInfo;
2425
import com.dtstack.flink.sql.table.TableInfo;
26+
import com.dtstack.flink.sql.util.ClassUtil;
2527
import com.dtstack.flink.sql.util.MathUtil;
2628

2729
import java.util.Map;
30+
import java.util.regex.Matcher;
31+
import java.util.regex.Pattern;
2832

2933
/**
3034
* Reason:
@@ -35,6 +39,28 @@
3539

3640
public class KafkaSourceParser extends AbsSourceParser {
3741

42+
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
43+
44+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
45+
46+
static {
47+
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
48+
49+
keyHandlerMap.put(KAFKA_NEST_FIELD_KEY, KafkaSourceParser::dealNestField);
50+
}
51+
52+
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
53+
String physicalField = matcher.group(1);
54+
String fieldType = matcher.group(3);
55+
String mappingField = matcher.group(4);
56+
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
57+
58+
tableInfo.addPhysicalMappings(mappingField, physicalField);
59+
tableInfo.addField(mappingField);
60+
tableInfo.addFieldClass(fieldClass);
61+
tableInfo.addFieldType(fieldType);
62+
}
63+
3864
@Override
3965
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
4066

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2829
import org.apache.flink.metrics.MetricGroup;
30+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2931
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3032
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3134
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3235
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3336
import org.apache.flink.types.Row;
@@ -40,6 +43,7 @@
4043
import java.io.IOException;
4144
import java.lang.reflect.Field;
4245
import java.util.Iterator;
46+
import java.util.Map;
4347
import java.util.Set;
4448

4549
import static com.dtstack.flink.sql.metric.MetricConstant.*;
@@ -75,12 +79,18 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7579

7680
private boolean firstMsg = true;
7781

78-
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
82+
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
83+
84+
private Map<String, String> rowAndFieldMapping;
85+
86+
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping){
7987
this.typeInfo = typeInfo;
8088

8189
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
8290

8391
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
92+
93+
this.rowAndFieldMapping= rowAndFieldMapping;
8494
}
8595

8696
@Override
@@ -101,9 +111,11 @@ public Row deserialize(byte[] message) throws IOException {
101111
numInBytes.inc(message.length);
102112

103113
JsonNode root = objectMapper.readTree(message);
114+
parseTree(root, null);
104115
Row row = new Row(fieldNames.length);
116+
105117
for (int i = 0; i < fieldNames.length; i++) {
106-
JsonNode node = getIgnoreCase(root, fieldNames[i]);
118+
JsonNode node = getIgnoreCase(fieldNames[i]);
107119

108120
if (node == null) {
109121
if (failOnMissingField) {
@@ -125,25 +137,50 @@ public Row deserialize(byte[] message) throws IOException {
125137
//add metric of dirty data
126138
dirtyDataCounter.inc();
127139
return null;
140+
}finally {
141+
nodeAndJsonNodeMapping.clear();
128142
}
129143
}
130144

145+
public JsonNode getIgnoreCase(String key) {
146+
String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
147+
JsonNode node = nodeAndJsonNodeMapping.get(nodeMappingKey);
148+
JsonNodeType nodeType = node.getNodeType();
149+
150+
if (nodeType==JsonNodeType.ARRAY){
151+
throw new IllegalStateException("Unsupported type information array .") ;
152+
}
153+
154+
return node;
155+
}
156+
157+
131158
public void setFailOnMissingField(boolean failOnMissingField) {
132159
this.failOnMissingField = failOnMissingField;
133160
}
134161

135-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
162+
private void parseTree(JsonNode jsonNode, String prefix){
136163

137-
Iterator<String> iter = jsonNode.fieldNames();
138-
while (iter.hasNext()) {
139-
String key1 = iter.next();
140-
if (key1.equalsIgnoreCase(key)) {
141-
return jsonNode.get(key1);
164+
Iterator<String> iterator = jsonNode.fieldNames();
165+
while (iterator.hasNext()){
166+
String next = iterator.next();
167+
JsonNode child = jsonNode.get(next);
168+
String nodeKey = getNodeKey(prefix, next);
169+
170+
if (child.isValueNode()){
171+
nodeAndJsonNodeMapping.put(nodeKey, child);
172+
}else {
173+
parseTree(child, nodeKey);
142174
}
143175
}
176+
}
144177

145-
return null;
178+
private String getNodeKey(String prefix, String nodeName){
179+
if(Strings.isNullOrEmpty(prefix)){
180+
return nodeName;
181+
}
146182

183+
return prefix + "." + nodeName;
147184
}
148185

149186
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
8585
FlinkKafkaConsumer010<Row> kafkaSrc;
8686
if (BooleanUtils.isTrue(kafka010SourceTableInfo.getTopicIsPattern())) {
8787
kafkaSrc = new CustomerKafka010Consumer(Pattern.compile(topicName),
88-
new CustomerJsonDeserialization(typeInformation), props);
88+
new CustomerJsonDeserialization(typeInformation, kafka010SourceTableInfo.getPhysicalFields()), props);
8989
} else {
9090
kafkaSrc = new CustomerKafka010Consumer(topicName,
91-
new CustomerJsonDeserialization(typeInformation), props);
91+
new CustomerJsonDeserialization(typeInformation, kafka010SourceTableInfo.getPhysicalFields()), props);
9292
}
9393

9494
//earliest,latest

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222

2323
import com.dtstack.flink.sql.table.AbsSourceParser;
2424
import com.dtstack.flink.sql.table.TableInfo;
25+
import com.dtstack.flink.sql.util.ClassUtil;
2526
import com.dtstack.flink.sql.util.MathUtil;
2627

2728
import java.util.Map;
29+
import java.util.regex.Matcher;
30+
import java.util.regex.Pattern;
2831

2932
/**
3033
* Reason:
@@ -35,6 +38,33 @@
3538

3639
public class KafkaSourceParser extends AbsSourceParser {
3740

41+
private static final String KAFKA_NEST_FIELD_KEY = "nestFieldKey";
42+
43+
private static Pattern kafkaNestFieldKeyPattern = Pattern.compile("(?i)((\\w+\\.)*\\w+)\\s+(\\w+)\\s+AS\\s+(\\w+)$");
44+
45+
static {
46+
keyPatternMap.put(KAFKA_NEST_FIELD_KEY, kafkaNestFieldKeyPattern);
47+
48+
keyHandlerMap.put(KAFKA_NEST_FIELD_KEY, KafkaSourceParser::dealNestField);
49+
}
50+
51+
/**
52+
* add parser for alias field
53+
* @param matcher
54+
* @param tableInfo
55+
*/
56+
static void dealNestField(Matcher matcher, TableInfo tableInfo) {
57+
String physicalField = matcher.group(1);
58+
String fieldType = matcher.group(3);
59+
String mappingField = matcher.group(4);
60+
Class fieldClass= ClassUtil.stringConvertClass(fieldType);
61+
62+
tableInfo.addPhysicalMappings(mappingField, physicalField);
63+
tableInfo.addField(mappingField);
64+
tableInfo.addFieldClass(fieldClass);
65+
tableInfo.addFieldType(fieldType);
66+
}
67+
3868
@Override
3969
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
4070

0 commit comments

Comments
 (0)