2222import com .dtstack .flinkx .restapi .common .HttpUtil ;
2323import com .dtstack .flinkx .util .ExceptionUtil ;
2424import com .google .common .collect .Maps ;
25+ import com .google .gson .Gson ;
26+ import com .google .gson .GsonBuilder ;
2527import org .apache .flink .types .Row ;
2628import org .apache .http .HttpStatus ;
2729import org .apache .http .client .config .RequestConfig ;
@@ -57,9 +59,12 @@ public class RestapiOutputFormat extends BaseRichOutputFormat {
5759
5860 protected static final int DEFAULT_TIME_OUT = 300000 ;
5961
62+ protected Gson gson ;
63+
6064 @ Override
6165 protected void openInternal (int taskNumber , int numTasks ) throws IOException {
6266 params .put ("threadId" , UUID .randomUUID ().toString ().substring (0 , 8 ));
67+ gson = new GsonBuilder ().serializeNulls ().create ();
6368 }
6469
6570 @ Override
@@ -80,7 +85,7 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
8085 }
8186 body .put ("data" , dataRow );
8287 requestBody .put ("json" , body );
83- LOG .debug ("当前发送的数据为:{}" , HttpUtil . gson .toJson (requestBody ));
88+ LOG .debug ("当前发送的数据为:{}" , gson .toJson (requestBody ));
8489 sendRequest (httpClient , requestBody , method , header , url );
8590 } catch (Exception e ) {
8691 requestErrorMessage (e , index , row );
@@ -111,7 +116,7 @@ protected void writeMultipleRecordsInternal() throws Exception {
111116 }
112117 body .put ("data" , dataRow );
113118 requestBody .put ("json" , body );
114- LOG .debug ("当前发送的数据为:{}" , HttpUtil . gson .toJson (requestBody ));
119+ LOG .debug ("当前发送的数据为:{}" , gson .toJson (requestBody ));
115120 sendRequest (httpClient , requestBody , method , header , url );
116121 } catch (Exception e ) {
117122 LOG .error (ExceptionUtil .getErrorMessage (e ));
@@ -134,7 +139,7 @@ private Object getDataFromRow(Row row, List<String> column) throws IOException {
134139 for (; index < row .getArity (); index ++) {
135140 columnData .put (column .get (index ), row .getField (index ));
136141 }
137- return HttpUtil . gson .toJson (columnData );
142+ return gson .toJson (columnData );
138143 } else {
139144 return row .getField (index );
140145 }
@@ -146,7 +151,7 @@ private void sendRequest(CloseableHttpClient httpClient,
146151 String method ,
147152 Map <String , String > header ,
148153 String url ) throws IOException {
149- LOG .debug ("当前发送的数据为:{}" , HttpUtil . gson .toJson (requestBody ));
154+ LOG .debug ("当前发送的数据为:{}" , gson .toJson (requestBody ));
150155 HttpRequestBase request = HttpUtil .getRequest (method , requestBody , header , url );
151156 //设置请求和传输超时时间
152157 RequestConfig requestConfig = RequestConfig .custom ()
0 commit comments