Skip to content

Commit 321f775

Browse files
committed
[hotfix-32418][restapi] 将restapi的改动合并
1 parent f81ff4c commit 321f775

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

flinkx-restapi/flinkx-restapi-core/src/main/java/com/dtstack/flinkx/restapi/common/RestapiKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ public class RestapiKeys {
2929
public static final String KEY_COLUMN = "column";
3030
public static final String KEY_URL = "url";
3131
public static final String KEY_BATCH_INTERVAL = "batchInterval";
32+
public static final String KEY_BATCH = "batchId";
3233
}

flinkx-restapi/flinkx-restapi-writer/src/main/java/com/dtstack/flinkx/restapi/outputformat/RestapiOutputFormat.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.Map;
3939
import java.util.UUID;
4040

41+
import static com.dtstack.flinkx.restapi.common.RestapiKeys.KEY_BATCH;
42+
4143
/**
4244
* @author : tiezhu
4345
* @date : 2020/3/12
@@ -73,9 +75,10 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
7375
CloseableHttpClient httpClient = HttpUtil.getHttpClient();
7476
int index = 0;
7577
Map<String, Object> requestBody = Maps.newHashMap();
76-
Object dataRow;
78+
List<Object> dataRow = new ArrayList<>();
7779
try {
78-
dataRow = getDataFromRow(row, column);
80+
dataRow.add(getDataFromRow(row, column));
81+
params.put(KEY_BATCH, UUID.randomUUID().toString().substring(0, 8));
7982
if (!params.isEmpty()) {
8083
Iterator iterator = params.entrySet().iterator();
8184
while (iterator.hasNext()) {
@@ -85,7 +88,7 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
8588
}
8689
body.put("data", dataRow);
8790
requestBody.put("json", body);
88-
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
91+
LOG.info("send data:{}", gson.toJson(requestBody));
8992
sendRequest(httpClient, requestBody, method, header, url);
9093
} catch (Exception e) {
9194
requestErrorMessage(e, index, row);
@@ -107,6 +110,7 @@ protected void writeMultipleRecordsInternal() throws Exception {
107110
for (Row row : rows) {
108111
dataRow.add(getDataFromRow(row, column));
109112
}
113+
params.put(KEY_BATCH, UUID.randomUUID().toString().substring(0, 8));
110114
if (!params.isEmpty()) {
111115
Iterator iterator = params.entrySet().iterator();
112116
while (iterator.hasNext()) {
@@ -116,7 +120,7 @@ protected void writeMultipleRecordsInternal() throws Exception {
116120
}
117121
body.put("data", dataRow);
118122
requestBody.put("json", body);
119-
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
123+
LOG.info("this batch size = {}, send data:{}", rows.size(), gson.toJson(requestBody));
120124
sendRequest(httpClient, requestBody, method, header, url);
121125
} catch (Exception e) {
122126
LOG.error(ExceptionUtil.getErrorMessage(e));
@@ -151,7 +155,7 @@ private void sendRequest(CloseableHttpClient httpClient,
151155
String method,
152156
Map<String, String> header,
153157
String url) throws IOException {
154-
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
158+
LOG.debug("send data:{}", gson.toJson(requestBody));
155159
HttpRequestBase request = HttpUtil.getRequest(method, requestBody, header, url);
156160
//设置请求和传输超时时间
157161
RequestConfig requestConfig = RequestConfig.custom()

0 commit comments

Comments
 (0)