2626import com .dtstack .flinkx .restapi .common .handler .ResponseRetryException ;
2727import com .dtstack .flinkx .restapi .common .httprequestApi ;
2828import com .dtstack .flinkx .util .ExceptionUtil ;
29+ import com .dtstack .flinkx .util .GsonUtil ;
2930import org .apache .commons .collections .CollectionUtils ;
3031import org .apache .flink .types .Row ;
3132import org .apache .http .HttpEntity ;
3637import org .slf4j .Logger ;
3738import org .slf4j .LoggerFactory ;
3839
39- import java .util .HashMap ;
40- import java .util .List ;
41- import java .util .Map ;
40+ import java .util .*;
4241import java .util .concurrent .*;
4342import java .util .concurrent .atomic .AtomicInteger ;
4443
@@ -59,7 +58,7 @@ public class HttpClient {
5958 private AtomicInteger atomicInteger = new AtomicInteger (0 );
6059 private static final String THREAD_NAME = "restApiReader-thread" ;
6160 private List <MetaColumn > metaColumns ;
62- private List <Handler > handlers ;
61+ private List <Handler > handlers = new ArrayList <>( 2 ) ;
6362
6463
6564 public HttpClient (RestContext restContext , Long intervalTime , List <MetaColumn > metaColumns ) {
@@ -122,13 +121,15 @@ public void doExecute(httprequestApi.Httprequest build, int retryTime) {
122121 }
123122 if (CollectionUtils .isEmpty (metaColumns ) || (metaColumns .size () == 1 && metaColumns .get (0 ).getName ().equals (ConstantValue .STAR_SYMBOL ))) {
124123 queue .put (Row .of (map ));
125- }else {
124+ } else {
126125 HashMap <String , Object > stringObjectHashMap = new HashMap <>();
127126 for (MetaColumn metaColumn : metaColumns ) {
128127 String [] names = metaColumn .getName ().split ("\\ ." );
129128 Map <String , Object > keyToMap = initData (stringObjectHashMap , names );
130- Object data = getData (map , names );
131- keyToMap .put (names [names .length - 1 ], data );
129+ if (Objects .nonNull (keyToMap )) {
130+ Object data = getData (map , names );
131+ keyToMap .put (names [names .length - 1 ], data );
132+ }
132133 }
133134 queue .put (Row .of (stringObjectHashMap ));
134135 }
@@ -156,26 +157,46 @@ public void close() {
156157 }
157158
158159 public Map <String , Object > initData (HashMap <String , Object > data , String [] names ) {
159- HashMap <String , Object > tempHashMap = data ;
160+ Map <String , Object > tempHashMap = data ;
160161 for (int i = 0 ; i < names .length ; i ++) {
161162 if (i != names .length - 1 ) {
162163 HashMap <String , Object > objectObjectHashMap = new HashMap <String , Object >(4 );
163- tempHashMap .putIfAbsent (names [i ], objectObjectHashMap );
164- tempHashMap = objectObjectHashMap ;
164+ Object value = tempHashMap .putIfAbsent (names [i ], objectObjectHashMap );
165+ if (value instanceof String ) {
166+ try {
167+ Map o = GsonUtil .GSON .fromJson ((String ) value , GsonUtil .gsonMapTypeToken );
168+ tempHashMap .put (names [i ], o );
169+ tempHashMap = o ;
170+ } catch (Exception e ) {
171+ return null ;
172+ }
173+ } else if (value instanceof Map ) {
174+ tempHashMap = (Map ) value ;
175+ } else {
176+ return null ;
177+ }
165178 } else {
166- tempHashMap .putIfAbsent (names [i ], null );
179+ tempHashMap .put (names [i ], null );
167180 }
168181 }
169182 return tempHashMap ;
170183 }
171184
172185 public Object getData (Map <String , Object > data , String [] names ) {
173- //metaColumns有可能为空 或者 有可能为*
174186 Map <String , Object > tempHashMap = data ;
175187 for (int i = 0 ; i < names .length ; i ++) {
176188 if (tempHashMap .containsKey (names [i ]) && i != names .length - 1 ) {
189+ if (Objects .isNull (tempHashMap .get (names [i ]))) {
190+ return null ;
191+ }
177192 if (tempHashMap .get (names [i ]) instanceof Map ) {
178193 tempHashMap = (Map <String , Object >) tempHashMap .get (names [i ]);
194+ } else if (tempHashMap .get (names [i ]) instanceof String ) {
195+ try {
196+ tempHashMap = GsonUtil .GSON .fromJson ((String ) tempHashMap .get (names [i ]), GsonUtil .gsonMapTypeToken );
197+ } catch (Exception e ) {
198+ return null ;
199+ }
179200 } else {
180201 return null ;
181202 }
0 commit comments