Skip to content

Commit dbe844d

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_4.0.x_30332' into 1.8_release_4.0.x
2 parents 450869d + 53e554e commit dbe844d

File tree

3 files changed

+16
-9
lines changed

3 files changed

+16
-9
lines changed

flinkx-metadata-oracle/flinkx-metadata-oracle-reader/src/main/java/com/dtstack/flinkx/metadataoracle/inputformat/MetadataoracleInputFormat.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.flinkx.constants.ConstantValue;
2222
import com.dtstack.flinkx.metadata.inputformat.BaseMetadataInputFormat;
2323
import org.apache.commons.collections.CollectionUtils;
24+
import org.apache.commons.lang3.StringUtils;
2425

2526
import java.sql.ResultSet;
2627
import java.sql.SQLException;
@@ -105,7 +106,7 @@ protected Map<String, Object> queryMetaData(String tableName) throws SQLExceptio
105106

106107
Map<String, Map<String, String> > queryTableProperties() throws SQLException {
107108
Map<String, Map<String, String>> tablePropertiesMap = new HashMap<>(16);
108-
if(allTable==null){
109+
if(StringUtils.isBlank(allTable)){
109110
sql = String.format(SQL_QUERY_TABLE_PROPERTIES_TOTAL, quote(currentDb.get()));
110111
}else {
111112
sql = String.format(SQL_QUERY_TABLE_PROPERTIES, quote(currentDb.get()), allTable);
@@ -126,7 +127,7 @@ Map<String, Map<String, String> > queryTableProperties() throws SQLException {
126127

127128
Map<String, List<Map<String, String>>> queryIndexList() throws SQLException {
128129
Map<String, List<Map<String, String>>> indexListMap = new HashMap<>(16);
129-
if(allTable==null){
130+
if(StringUtils.isBlank(allTable)){
130131
sql = String.format(SQL_QUERY_INDEX_TOTAL, quote(currentDb.get()));
131132
}else {
132133
sql = String.format(SQL_QUERY_INDEX, quote(currentDb.get()), allTable);
@@ -152,7 +153,7 @@ Map<String, List<Map<String, String>>> queryIndexList() throws SQLException {
152153

153154
Map<String, List<Map<String, Object>>> queryColumnList() throws SQLException {
154155
Map<String, List<Map<String, Object>>> columnListMap = new HashMap<>(16);
155-
if(allTable==null){
156+
if(StringUtils.isBlank(allTable)){
156157
sql = String.format(SQL_QUERY_COLUMN_TOTAL, quote(currentDb.get()));
157158
}else {
158159
sql = String.format(SQL_QUERY_COLUMN, quote(currentDb.get()), allTable);

flinkx-metadata/flinkx-metadata-reader/src/main/java/com/dtstack/flinkx/metadata/inputformat/BaseMetadataInputFormat.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ protected void openInternal(InputSplit inputSplit) throws IOException {
8282
tableIterator.set(tableList.iterator());
8383
init();
8484
} catch (SQLException | ClassNotFoundException e) {
85-
LOG.error("获取table列表异常, dbUrl = {}, username = {}, inputSplit = {}, e = {}", dbUrl, username, inputSplit, ExceptionUtil.getErrorMessage(e));
86-
throw new IOException("获取table列表异常", e);
85+
String message = String.format("获取table列表异常, dbUrl = %s, username = %s, inputSplit = %s, e = %s", dbUrl, username, inputSplit, ExceptionUtil.getErrorMessage(e));
86+
LOG.error(message);
87+
throw new IOException(message, e);
8788
}
8889
}
8990

flinkx-restapi/flinkx-restapi-writer/src/main/java/com/dtstack/flinkx/restapi/outputformat/RestapiOutputFormat.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.dtstack.flinkx.restapi.common.HttpUtil;
2323
import com.dtstack.flinkx.util.ExceptionUtil;
2424
import com.google.common.collect.Maps;
25+
import com.google.gson.Gson;
26+
import com.google.gson.GsonBuilder;
2527
import org.apache.flink.types.Row;
2628
import org.apache.http.HttpStatus;
2729
import org.apache.http.client.config.RequestConfig;
@@ -57,9 +59,12 @@ public class RestapiOutputFormat extends BaseRichOutputFormat {
5759

5860
protected static final int DEFAULT_TIME_OUT = 300000;
5961

62+
protected Gson gson;
63+
6064
@Override
6165
protected void openInternal(int taskNumber, int numTasks) throws IOException {
6266
params.put("threadId", UUID.randomUUID().toString().substring(0, 8));
67+
gson = new GsonBuilder().serializeNulls().create();
6368
}
6469

6570
@Override
@@ -80,7 +85,7 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
8085
}
8186
body.put("data", dataRow);
8287
requestBody.put("json", body);
83-
LOG.debug("当前发送的数据为:{}", HttpUtil.gson.toJson(requestBody));
88+
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
8489
sendRequest(httpClient, requestBody, method, header, url);
8590
} catch (Exception e) {
8691
requestErrorMessage(e, index, row);
@@ -111,7 +116,7 @@ protected void writeMultipleRecordsInternal() throws Exception {
111116
}
112117
body.put("data", dataRow);
113118
requestBody.put("json", body);
114-
LOG.debug("当前发送的数据为:{}", HttpUtil.gson.toJson(requestBody));
119+
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
115120
sendRequest(httpClient, requestBody, method, header, url);
116121
} catch (Exception e) {
117122
LOG.error(ExceptionUtil.getErrorMessage(e));
@@ -134,7 +139,7 @@ private Object getDataFromRow(Row row, List<String> column) throws IOException {
134139
for (; index < row.getArity(); index++) {
135140
columnData.put(column.get(index), row.getField(index));
136141
}
137-
return HttpUtil.gson.toJson(columnData);
142+
return gson.toJson(columnData);
138143
} else {
139144
return row.getField(index);
140145
}
@@ -146,7 +151,7 @@ private void sendRequest(CloseableHttpClient httpClient,
146151
String method,
147152
Map<String, String> header,
148153
String url) throws IOException {
149-
LOG.debug("当前发送的数据为:{}", HttpUtil.gson.toJson(requestBody));
154+
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
150155
HttpRequestBase request = HttpUtil.getRequest(method, requestBody, header, url);
151156
//设置请求和传输超时时间
152157
RequestConfig requestConfig = RequestConfig.custom()

0 commit comments

Comments
 (0)