Skip to content

Commit 02de576

Browse files
committed
json array 的读取为text
1 parent 8c44b4d commit 02de576

File tree

3 files changed

+9
-0
lines changed

3 files changed

+9
-0
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
34+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3435
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3536
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3637
import org.apache.flink.types.Row;
@@ -174,6 +175,8 @@ private void parseTree(JsonNode jsonNode, String prefix){
174175

175176
if (child.isValueNode()){
176177
nodeAndJsonNodeMapping.put(nodeKey, child);
178+
} else if(child.isArray()){
179+
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
177180
}else {
178181
parseTree(child, nodeKey);
179182
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
34+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3435
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3536
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3637
import org.apache.flink.types.Row;
@@ -174,6 +175,8 @@ private void parseTree(JsonNode jsonNode, String prefix){
174175

175176
if (child.isValueNode()){
176177
nodeAndJsonNodeMapping.put(nodeKey, child);
178+
}else if(child.isArray()){
179+
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
177180
}else {
178181
parseTree(child, nodeKey);
179182
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3232
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3333
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
34+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
3435
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3536
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3637
import org.apache.flink.types.Row;
@@ -175,6 +176,8 @@ private void parseTree(JsonNode jsonNode, String prefix){
175176

176177
if (child.isValueNode()){
177178
nodeAndJsonNodeMapping.put(nodeKey, child);
179+
}else if(child.isArray()){
180+
nodeAndJsonNodeMapping.put(nodeKey, new TextNode(child.toString()));
178181
}else {
179182
parseTree(child, nodeKey);
180183
}

0 commit comments

Comments
 (0)