3030import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .core .JsonProcessingException ;
3131import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
3232import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
33+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .ArrayNode ;
3334import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .JsonNodeType ;
3435import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .TextNode ;
3536import org .apache .flink .streaming .connectors .kafka .internal .KafkaConsumerThread ;
@@ -157,21 +158,27 @@ public Row deserialize(byte[] message) throws IOException {
157158
158159 public JsonNode getIgnoreCase (String key ) {
159160 String nodeMappingKey = rowAndFieldMapping .getOrDefault (key , key );
160- JsonNode node = nodeAndJsonNodeMapping .get (nodeMappingKey );
161- if (node == null ){
162- return null ;
163- }
164-
165- JsonNodeType nodeType = node .getNodeType ();
166-
167- if (nodeType ==JsonNodeType .ARRAY ){
168- throw new IllegalStateException ("Unsupported type information array ." ) ;
169- }
170-
171- return node ;
161+ return nodeAndJsonNodeMapping .get (nodeMappingKey );
172162 }
173163
174164 private void parseTree (JsonNode jsonNode , String prefix ){
165+ if (jsonNode .isArray ()) {
166+ ArrayNode array = (ArrayNode ) jsonNode ;
167+ for (int i = 0 ; i < array .size (); i ++) {
168+ JsonNode child = array .get (i );
169+ String nodeKey = getNodeKey (prefix , i );
170+
171+ if (child .isValueNode ()) {
172+ nodeAndJsonNodeMapping .put (nodeKey , child );
173+ } else {
174+ if (rowAndFieldMapping .containsValue (nodeKey )) {
175+ nodeAndJsonNodeMapping .put (nodeKey , child );
176+ }
177+ parseTree (child , nodeKey );
178+ }
179+ }
180+ return ;
181+ }
175182
176183 Iterator <String > iterator = jsonNode .fieldNames ();
177184 while (iterator .hasNext ()){
@@ -182,7 +189,7 @@ private void parseTree(JsonNode jsonNode, String prefix){
182189 if (child .isValueNode ()){
183190 nodeAndJsonNodeMapping .put (nodeKey , child );
184191 }else if (child .isArray ()){
185- nodeAndJsonNodeMapping . put ( nodeKey , new TextNode ( child . toString ()) );
192+ parseTree ( child , nodeKey );
186193 }else {
187194 parseTree (child , nodeKey );
188195 }
@@ -197,6 +204,13 @@ private String getNodeKey(String prefix, String nodeName){
197204 return prefix + "." + nodeName ;
198205 }
199206
207+ private String getNodeKey (String prefix , int i ) {
208+ if (Strings .isNullOrEmpty (prefix )) {
209+ return "[" + i + "]" ;
210+ }
211+ return prefix + "[" + i + "]" ;
212+ }
213+
200214 public void setFetcher (AbstractFetcher <Row , ?> fetcher ) {
201215 this .fetcher = fetcher ;
202216 }
0 commit comments