Skip to content

Commit 2b5dd49

Browse files
committed
handle json array type when get node
1 parent 10d9046 commit 2b5dd49

File tree

1 file changed

+12
-9
lines changed

1 file changed

+12
-9
lines changed

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/CustomerJsonDeserialization.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
3535
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
3636
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
37+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeType;
3738
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
3839
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
3940
import org.apache.flink.types.Row;
@@ -124,7 +125,7 @@ public Row deserialize(byte[] message) {
124125
Row row = new Row(fieldNames.length);
125126

126127
for (int i = 0; i < fieldNames.length; i++) {
127-
JsonNode node = getIgnoreCase(root, fieldNames[i]);
128+
JsonNode node = getIgnoreCase(fieldNames[i]);
128129

129130
if (node == null) {
130131
if (failOnMissingField) {
@@ -168,9 +169,16 @@ public void setFailOnMissingField(boolean failOnMissingField) {
168169
}
169170

170171

171-
public JsonNode getIgnoreCase(JsonNode jsonNode, String key) {
172+
public JsonNode getIgnoreCase(String key) {
172173
String nodeMappingKey = rowAndFieldMapping.get(key);
173-
return nodeAndJsonnodeMapping.get(nodeMappingKey);
174+
JsonNode node = nodeAndJsonnodeMapping.get(nodeMappingKey);
175+
JsonNodeType nodeType = node.getNodeType();
176+
177+
if (nodeType==JsonNodeType.ARRAY){
178+
throw new IllegalStateException("Unsupported type information array .") ;
179+
}
180+
181+
return node;
174182
}
175183

176184
public void setFetcher(AbstractFetcher<Row, ?> fetcher) {
@@ -236,12 +244,7 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
236244
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
237245
// local zone
238246
return Timestamp.valueOf(node.asText());
239-
} else if (info instanceof ObjectArrayTypeInfo) {
240-
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
241-
} else if (info instanceof BasicArrayTypeInfo) {
242-
throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
243-
} else if (info instanceof PrimitiveArrayTypeInfo &&
244-
((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
247+
} else if (info.getTypeClass().equals(Types.BYTE.getTypeClass())){
245248
return convertByteArray(node);
246249
} else {
247250
// for types that were specified without JSON schema

0 commit comments

Comments
 (0)