Skip to content

Commit 0334320

Browse files
committed
添加对解析json嵌套结构的支持
1 parent 23925a2 commit 0334320

File tree

3 files changed

+81
-24
lines changed

3 files changed

+81
-24
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2829
import org.apache.flink.metrics.MetricGroup;
30+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2931
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3032
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3133
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
@@ -40,6 +42,7 @@
4042
import java.io.IOException;
4143
import java.lang.reflect.Field;
4244
import java.util.Iterator;
45+
import java.util.Map;
4346
import java.util.Set;
4447

4548
import static com.dtstack.flink.sql.metric.MetricConstant.*;
@@ -75,6 +78,9 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7578

7679
private boolean firstMsg = true;
7780

81+
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
82+
83+
7884
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
7985
this.typeInfo = typeInfo;
8086

@@ -101,9 +107,11 @@ public Row deserialize(byte[] message) throws IOException {
101107
numInBytes.inc(message.length);
102108

103109
JsonNode root = objectMapper.readTree(message);
110+
parseTree(root, null);
104111
Row row = new Row(fieldNames.length);
112+
105113
for (int i = 0; i < fieldNames.length; i++) {
106-
JsonNode node = getIgnoreCase(root, fieldNames[i]);
114+
JsonNode node = nodeAndJsonNodeMapping.get(fieldNames[i]);
107115

108116
if (node == null) {
109117
if (failOnMissingField) {
@@ -132,18 +140,30 @@ public void setFailOnMissingField(boolean failOnMissingField) {
132140
this.failOnMissingField = failOnMissingField;
133141
}
134142

135-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
136143

137-
Iterator<String> iter = jsonNode.fieldNames();
138-
while (iter.hasNext()) {
139-
String key1 = iter.next();
140-
if (key1.equalsIgnoreCase(key)) {
141-
return jsonNode.get(key1);
144+
private void parseTree(JsonNode jsonNode, String prefix){
145+
nodeAndJsonNodeMapping.clear();
146+
147+
Iterator<String> iterator = jsonNode.fieldNames();
148+
while (iterator.hasNext()){
149+
String next = iterator.next();
150+
JsonNode child = jsonNode.get(next);
151+
String nodeKey = getNodeKey(prefix, next);
152+
153+
if (child.isValueNode()){
154+
nodeAndJsonNodeMapping.put(nodeKey, child);
155+
}else {
156+
parseTree(child, nodeKey);
142157
}
143158
}
159+
}
144160

145-
return null;
161+
private String getNodeKey(String prefix, String nodeName){
162+
if(Strings.isNullOrEmpty(prefix)){
163+
return nodeName;
164+
}
146165

166+
return prefix + "." + nodeName;
147167
}
148168

149169
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2829
import org.apache.flink.metrics.MetricGroup;
30+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2931
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3032
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3133
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
@@ -40,6 +42,7 @@
4042
import java.io.IOException;
4143
import java.lang.reflect.Field;
4244
import java.util.Iterator;
45+
import java.util.Map;
4346
import java.util.Set;
4447

4548
import static com.dtstack.flink.sql.metric.MetricConstant.*;
@@ -75,6 +78,9 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7578

7679
private boolean firstMsg = true;
7780

81+
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
82+
83+
7884
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
7985
this.typeInfo = typeInfo;
8086

@@ -101,9 +107,11 @@ public Row deserialize(byte[] message) throws IOException {
101107
numInBytes.inc(message.length);
102108

103109
JsonNode root = objectMapper.readTree(message);
110+
parseTree(root, null);
104111
Row row = new Row(fieldNames.length);
112+
105113
for (int i = 0; i < fieldNames.length; i++) {
106-
JsonNode node = getIgnoreCase(root, fieldNames[i]);
114+
JsonNode node = nodeAndJsonNodeMapping.get(fieldNames[i]);
107115

108116
if (node == null) {
109117
if (failOnMissingField) {
@@ -132,18 +140,29 @@ public void setFailOnMissingField(boolean failOnMissingField) {
132140
this.failOnMissingField = failOnMissingField;
133141
}
134142

135-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
143+
private void parseTree(JsonNode jsonNode, String prefix){
144+
nodeAndJsonNodeMapping.clear();
136145

137-
Iterator<String> iter = jsonNode.fieldNames();
138-
while (iter.hasNext()) {
139-
String key1 = iter.next();
140-
if (key1.equalsIgnoreCase(key)) {
141-
return jsonNode.get(key1);
146+
Iterator<String> iterator = jsonNode.fieldNames();
147+
while (iterator.hasNext()){
148+
String next = iterator.next();
149+
JsonNode child = jsonNode.get(next);
150+
String nodeKey = getNodeKey(prefix, next);
151+
152+
if (child.isValueNode()){
153+
nodeAndJsonNodeMapping.put(nodeKey, child);
154+
}else {
155+
parseTree(child, nodeKey);
142156
}
143157
}
158+
}
144159

145-
return null;
160+
private String getNodeKey(String prefix, String nodeName){
161+
if(Strings.isNullOrEmpty(prefix)){
162+
return nodeName;
163+
}
146164

165+
return prefix + "." + nodeName;
147166
}
148167

149168
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import com.dtstack.flink.sql.source.kafka.metric.KafkaTopicPartitionLagMetric;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28+
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
2829
import org.apache.flink.metrics.MetricGroup;
30+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2931
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3032
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3133
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
@@ -40,6 +42,7 @@
4042
import java.io.IOException;
4143
import java.lang.reflect.Field;
4244
import java.util.Iterator;
45+
import java.util.Map;
4346
import java.util.Set;
4447

4548
import static com.dtstack.flink.sql.metric.MetricConstant.DT_PARTITION_GROUP;
@@ -77,6 +80,8 @@ public class CustomerJsonDeserialization extends AbsDeserialization<Row> {
7780

7881
private boolean firstMsg = true;
7982

83+
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
84+
8085
public CustomerJsonDeserialization(TypeInformation<Row> typeInfo){
8186
this.typeInfo = typeInfo;
8287

@@ -103,9 +108,11 @@ public Row deserialize(byte[] message) throws IOException {
103108
numInBytes.inc(message.length);
104109

105110
JsonNode root = objectMapper.readTree(message);
111+
parseTree(root, null);
106112
Row row = new Row(fieldNames.length);
113+
107114
for (int i = 0; i < fieldNames.length; i++) {
108-
JsonNode node = getIgnoreCase(root, fieldNames[i]);
115+
JsonNode node = nodeAndJsonNodeMapping.get(fieldNames[i]);
109116

110117
if (node == null) {
111118
if (failOnMissingField) {
@@ -134,18 +141,29 @@ public void setFailOnMissingField(boolean failOnMissingField) {
134141
this.failOnMissingField = failOnMissingField;
135142
}
136143

137-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
144+
private void parseTree(JsonNode jsonNode, String prefix){
145+
nodeAndJsonNodeMapping.clear();
146+
147+
Iterator<String> iterator = jsonNode.fieldNames();
148+
while (iterator.hasNext()){
149+
String next = iterator.next();
150+
JsonNode child = jsonNode.get(next);
151+
String nodeKey = getNodeKey(prefix, next);
138152

139-
Iterator<String> iter = jsonNode.fieldNames();
140-
while (iter.hasNext()) {
141-
String key1 = iter.next();
142-
if (key1.equalsIgnoreCase(key)) {
143-
return jsonNode.get(key1);
153+
if (child.isValueNode()){
154+
nodeAndJsonNodeMapping.put(nodeKey, child);
155+
}else {
156+
parseTree(child, nodeKey);
144157
}
145158
}
159+
}
146160

147-
return null;
161+
private String getNodeKey(String prefix, String nodeName){
162+
if(Strings.isNullOrEmpty(prefix)){
163+
return nodeName;
164+
}
148165

166+
return prefix + "." + nodeName;
149167
}
150168

151169
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {

0 commit comments

Comments
 (0)