Skip to content

Commit 6fbb9b3

Browse files
committed
[fix-31977][metadatatidb][restapi] 解决在库名带.时切换失败的问题
1 parent 7c53937 commit 6fbb9b3

File tree

3 files changed

+11
-5
lines changed
  • flinkx-metadata-tidb/flinkx-metadata-tidb-reader/src/main/java/com/dtstack/flinkx/metadatatidb/constants
  • flinkx-restapi

3 files changed

+11
-5
lines changed

flinkx-metadata-tidb/flinkx-metadata-tidb-reader/src/main/java/com/dtstack/flinkx/metadatatidb/constants/TidbMetadataCons.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class TidbMetadataCons extends MetaDataCons {
5353
public static final String RESULT_COMMENT = "Comment";
5454

5555
/** sql语句 */
56+
public static final String SQL_SWITCH_DATABASE = "USE `%s`";
5657
public static final String SQL_SHOW_TABLES = "SHOW FULL TABLES WHERE Table_type = 'BASE TABLE'";
5758
public static final String SQL_QUERY_TABLE_INFO = "SHOW TABLE STATUS LIKE '%s'";
5859
public static final String SQL_QUERY_COLUMN = "SHOW FULL COLUMNS FROM %s";

flinkx-restapi/flinkx-restapi-core/src/main/java/com/dtstack/flinkx/restapi/common/RestapiKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ public class RestapiKeys {
2929
public static final String KEY_COLUMN = "column";
3030
public static final String KEY_URL = "url";
3131
public static final String KEY_BATCH_INTERVAL = "batchInterval";
32+
public static final String KEY_BATCH = "batchId";
3233
}

flinkx-restapi/flinkx-restapi-writer/src/main/java/com/dtstack/flinkx/restapi/outputformat/RestapiOutputFormat.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.Map;
3939
import java.util.UUID;
4040

41+
import static com.dtstack.flinkx.restapi.common.RestapiKeys.KEY_BATCH;
42+
4143
/**
4244
* @author : tiezhu
4345
* @date : 2020/3/12
@@ -73,9 +75,10 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
7375
CloseableHttpClient httpClient = HttpUtil.getHttpClient();
7476
int index = 0;
7577
Map<String, Object> requestBody = Maps.newHashMap();
76-
Object dataRow;
78+
List<Object> dataRow = new ArrayList<>();
7779
try {
78-
dataRow = getDataFromRow(row, column);
80+
dataRow.add(getDataFromRow(row, column));
81+
params.put(KEY_BATCH, UUID.randomUUID().toString().substring(0, 8));
7982
if (!params.isEmpty()) {
8083
Iterator iterator = params.entrySet().iterator();
8184
while (iterator.hasNext()) {
@@ -85,7 +88,7 @@ protected void writeSingleRecordInternal(Row row) throws WriteRecordException {
8588
}
8689
body.put("data", dataRow);
8790
requestBody.put("json", body);
88-
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
91+
LOG.info("send data:{}", gson.toJson(requestBody));
8992
sendRequest(httpClient, requestBody, method, header, url);
9093
} catch (Exception e) {
9194
requestErrorMessage(e, index, row);
@@ -107,6 +110,7 @@ protected void writeMultipleRecordsInternal() throws Exception {
107110
for (Row row : rows) {
108111
dataRow.add(getDataFromRow(row, column));
109112
}
113+
params.put(KEY_BATCH, UUID.randomUUID().toString().substring(0, 8));
110114
if (!params.isEmpty()) {
111115
Iterator iterator = params.entrySet().iterator();
112116
while (iterator.hasNext()) {
@@ -116,7 +120,7 @@ protected void writeMultipleRecordsInternal() throws Exception {
116120
}
117121
body.put("data", dataRow);
118122
requestBody.put("json", body);
119-
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
123+
LOG.info("this batch size = {}, send data:{}", rows.size(), gson.toJson(requestBody));
120124
sendRequest(httpClient, requestBody, method, header, url);
121125
} catch (Exception e) {
122126
LOG.error(ExceptionUtil.getErrorMessage(e));
@@ -151,7 +155,7 @@ private void sendRequest(CloseableHttpClient httpClient,
151155
String method,
152156
Map<String, String> header,
153157
String url) throws IOException {
154-
LOG.debug("当前发送的数据为:{}", gson.toJson(requestBody));
158+
LOG.debug("send data:{}", gson.toJson(requestBody));
155159
HttpRequestBase request = HttpUtil.getRequest(method, requestBody, header, url);
156160
//设置请求和传输超时时间
157161
RequestConfig requestConfig = RequestConfig.custom()

0 commit comments

Comments
 (0)