Skip to content

Commit 58fbb3b

Browse files
authored
#1494 [feat-1494][http] http support array data (#1495)
1 parent de5b214 commit 58fbb3b

File tree

17 files changed

+834
-76
lines changed

17 files changed

+834
-76
lines changed

chunjun-connectors/chunjun-connector-http/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,21 @@
3131
<artifactId>chunjun-connector-http</artifactId>
3232
<name>ChunJun : Connectors : Http</name>
3333

34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.dom4j</groupId>
38+
<artifactId>dom4j</artifactId>
39+
<version>2.1.3</version>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>net.sourceforge.javacsv</groupId>
44+
<artifactId>javacsv</artifactId>
45+
<version>2.0</version>
46+
</dependency>
47+
</dependencies>
48+
3449
<build>
3550
<plugins>
3651
<plugin>
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.http.client;
20+
21+
import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
22+
import com.dtstack.chunjun.converter.AbstractRowConverter;
23+
24+
import com.csvreader.CsvReader;
25+
import org.apache.commons.collections.CollectionUtils;
26+
27+
import java.io.IOException;
28+
import java.io.Reader;
29+
import java.io.StringReader;
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
33+
public class CsvResponseParse extends ResponseParse {
34+
private CsvReader csvReader;
35+
private Reader reader;
36+
private String responseValue;
37+
private HttpRequestParam requestParam;
38+
39+
public CsvResponseParse(HttpRestConfig config, AbstractRowConverter converter) {
40+
super(config, converter);
41+
if (CollectionUtils.isEmpty(columns)) {
42+
throw new RuntimeException("please configure column when decode is csv");
43+
}
44+
}
45+
46+
@Override
47+
public boolean hasNext() throws IOException {
48+
return csvReader.readRecord();
49+
}
50+
51+
@Override
52+
public ResponseValue next() throws Exception {
53+
String[] data = csvReader.getValues();
54+
HashMap<String, Object> stringObjectHashMap = new HashMap<>(columns.size());
55+
for (int i = 0; i < columns.size(); i++) {
56+
if (columns.get(i).getValue() != null) {
57+
stringObjectHashMap.put(columns.get(i).getName(), columns.get(i).getValue());
58+
} else {
59+
stringObjectHashMap.put(columns.get(i).getName(), data[i]);
60+
}
61+
}
62+
63+
return new ResponseValue(
64+
converter.toInternal(stringObjectHashMap), requestParam, responseValue);
65+
}
66+
67+
@Override
68+
public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) {
69+
this.responseValue = responseValue;
70+
this.requestParam = requestParam;
71+
this.reader = new StringReader(responseValue);
72+
this.csvReader = new CsvReader(reader);
73+
csvReader.setDelimiter(config.getCsvDelimiter().charAt(0));
74+
75+
Map<String, Object> csvConfig = config.getCsvConfig();
76+
// 是否跳过空行
77+
csvReader.setSkipEmptyRecords((Boolean) csvConfig.getOrDefault("skipEmptyRecords", true));
78+
// 是否使用csv转义字符
79+
csvReader.setUseTextQualifier((Boolean) csvConfig.getOrDefault("useTextQualifier", true));
80+
csvReader.setTrimWhitespace((Boolean) csvConfig.getOrDefault("trimWhitespace", false));
81+
// 单列长度是否限制100000字符
82+
csvReader.setSafetySwitch((Boolean) csvConfig.getOrDefault("safetySwitch", false));
83+
}
84+
}

chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,6 @@ public HttpRequestParam buildRequestParam(
163163
return requestParam;
164164
}
165165

166-
@Override
167-
public ResponseValue buildResponseValue(
168-
String decode, String responseValue, String fields, HttpRequestParam requestParam) {
169-
return new ResponseValue(responseValue, requestParam, responseValue);
170-
}
171-
172166
/**
173167
* 根据指定的key 构建一个新的response
174168
*

chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
2222
import com.dtstack.chunjun.connector.http.common.HttpUtil;
2323
import com.dtstack.chunjun.connector.http.common.MetaParam;
24+
import com.dtstack.chunjun.converter.AbstractRowConverter;
2425
import com.dtstack.chunjun.util.ExceptionUtil;
2526
import com.dtstack.chunjun.util.GsonUtil;
2627

@@ -41,6 +42,10 @@
4142
import java.util.concurrent.ScheduledThreadPoolExecutor;
4243
import java.util.concurrent.TimeUnit;
4344

45+
import static com.dtstack.chunjun.connector.http.common.ConstantValue.CSV_DECODE;
46+
import static com.dtstack.chunjun.connector.http.common.ConstantValue.TEXT_DECODE;
47+
import static com.dtstack.chunjun.connector.http.common.ConstantValue.XML_DECODE;
48+
4449
/**
4550
* httpClient
4651
*
@@ -60,6 +65,8 @@ public class HttpClient {
6065

6166
private final RestHandler restHandler;
6267

68+
protected final ResponseParse responseParse;
69+
6370
private int requestRetryTime;
6471

6572
/** origin body */
@@ -86,11 +93,14 @@ public class HttpClient {
8693

8794
private boolean running;
8895

96+
protected long requestNumber;
97+
8998
public HttpClient(
9099
HttpRestConfig httpRestConfig,
91100
List<MetaParam> originalBodyList,
92101
List<MetaParam> originalParamList,
93-
List<MetaParam> originalHeaderList) {
102+
List<MetaParam> originalHeaderList,
103+
AbstractRowConverter converter) {
94104
this.restConfig = httpRestConfig;
95105
this.originalHeaderList = originalHeaderList;
96106
this.originalBodyList = originalBodyList;
@@ -102,14 +112,16 @@ public HttpClient(
102112
this.queue = new LinkedBlockingQueue<>();
103113
this.scheduledExecutorService =
104114
new ScheduledThreadPoolExecutor(1, r -> new Thread(r, THREAD_NAME));
105-
this.httpClient = HttpUtil.getHttpsClient();
115+
this.httpClient = HttpUtil.getHttpsClient((int) restConfig.getTimeOut());
106116
this.restHandler = new DefaultRestHandler();
117+
this.responseParse = getResponseParse(converter);
107118

108119
this.prevResponse = "";
109120
this.first = true;
110121
this.currentParam = new HttpRequestParam();
111122
this.reachEnd = false;
112123
this.requestRetryTime = 2;
124+
this.requestNumber = 1;
113125
}
114126

115127
public void start() {
@@ -172,6 +184,7 @@ public void execute() {
172184
doExecute(ConstantValue.REQUEST_RETRY_TIME);
173185
first = false;
174186
requestRetryTime = 3;
187+
requestNumber++;
175188
}
176189

177190
public void doExecute(int retryTime) {
@@ -192,6 +205,7 @@ public void doExecute(int retryTime) {
192205

193206
// 执行请求
194207
String responseValue = null;
208+
int responseStatus;
195209
try {
196210

197211
HttpUriRequest request =
@@ -211,6 +225,7 @@ public void doExecute(int retryTime) {
211225
}
212226

213227
responseValue = EntityUtils.toString(httpResponse.getEntity());
228+
responseStatus = httpResponse.getStatusLine().getStatusCode();
214229
} catch (Throwable e) {
215230
// 只要本次请求中出现了异常 都会进行重试,如果重试次数达到了就真正结束任务
216231
LOG.warn(
@@ -244,27 +259,28 @@ public void doExecute(int retryTime) {
244259
case ConstantValue.STRATEGY_STOP:
245260
reachEnd = true;
246261
running = false;
262+
// stop 此次请求数据有问题 任务直接异常结束
263+
processData(new ResponseValue(0, null, strategy.toString(), null, null));
247264
break;
248265
default:
249266
break;
250267
}
251268
}
252269

253-
ResponseValue value =
254-
restHandler.buildResponseValue(
255-
restConfig.getDecode(),
256-
responseValue,
257-
restConfig.getFields(),
258-
HttpRequestParam.copy(currentParam));
270+
responseParse.parse(responseValue, responseStatus, HttpRequestParam.copy(currentParam));
271+
while (responseParse.hasNext()) {
272+
processData(responseParse.next());
273+
}
274+
275+
if (-1 != restConfig.getCycles() && requestNumber >= restConfig.getCycles()) {
276+
reachEnd = true;
277+
running = false;
278+
}
279+
259280
if (reachEnd) {
260281
// 如果结束了 需要告诉format 结束了
261-
if (value.isNormal()) {
262-
value.setStatus(0);
263-
// 触发的策略信息返回上游
264-
value.setErrorMsg(strategy.toString());
265-
}
282+
processData(new ResponseValue(2, null, null, null, null));
266283
}
267-
processData(value);
268284

269285
prevParam = currentParam;
270286
prevResponse = responseValue;
@@ -321,6 +337,19 @@ public void close() {
321337
}
322338
}
323339

340+
protected ResponseParse getResponseParse(AbstractRowConverter converter) {
341+
switch (restConfig.getDecode()) {
342+
case CSV_DECODE:
343+
return new CsvResponseParse(restConfig, converter);
344+
case XML_DECODE:
345+
return new XmlResponseParse(restConfig, converter);
346+
case TEXT_DECODE:
347+
return new TextResponseParse(restConfig, converter);
348+
default:
349+
return new JsonResponseParse(restConfig, converter);
350+
}
351+
}
352+
324353
@Override
325354
public String toString() {
326355
return "HttpClient{"
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.http.client;
20+
21+
import com.dtstack.chunjun.conf.FieldConf;
22+
import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
23+
import com.dtstack.chunjun.connector.http.util.JsonPathUtil;
24+
import com.dtstack.chunjun.constants.ConstantValue;
25+
import com.dtstack.chunjun.converter.AbstractRowConverter;
26+
import com.dtstack.chunjun.util.GsonUtil;
27+
import com.dtstack.chunjun.util.MapUtil;
28+
29+
import com.google.common.collect.Lists;
30+
import com.google.gson.Gson;
31+
import org.apache.commons.collections.CollectionUtils;
32+
import org.apache.commons.lang3.StringUtils;
33+
34+
import java.io.IOException;
35+
import java.util.Arrays;
36+
import java.util.HashMap;
37+
import java.util.Iterator;
38+
import java.util.LinkedHashMap;
39+
import java.util.List;
40+
import java.util.Map;
41+
import java.util.stream.Collectors;
42+
43+
public class JsonResponseParse extends ResponseParse {
44+
private String responseValue;
45+
private HttpRequestParam requestParam;
46+
private final Gson gson;
47+
private final List<FieldConf> fields;
48+
private Iterator<Map<String, Object>> iterator;
49+
50+
public JsonResponseParse(HttpRestConfig config, AbstractRowConverter converter) {
51+
super(config, converter);
52+
this.gson = GsonUtil.setTypeAdapter(new Gson());
53+
if (StringUtils.isNotBlank(config.getFields())) {
54+
fields =
55+
Arrays.stream(config.getFields().split(","))
56+
.map(
57+
i -> {
58+
FieldConf fieldConf = new FieldConf();
59+
fieldConf.setName(i);
60+
return fieldConf;
61+
})
62+
.collect(Collectors.toList());
63+
} else {
64+
fields = null;
65+
}
66+
}
67+
68+
@Override
69+
public boolean hasNext() throws IOException {
70+
return iterator.hasNext();
71+
}
72+
73+
@Override
74+
public ResponseValue next() throws Exception {
75+
Map next = iterator.next();
76+
77+
if (CollectionUtils.isEmpty(columns)) {
78+
if (CollectionUtils.isNotEmpty(fields)) {
79+
LinkedHashMap<String, Object> map =
80+
buildResponseByKey(next, fields, ConstantValue.POINT_SYMBOL);
81+
HashMap<String, Object> data = new HashMap<>();
82+
// 需要拆分key
83+
((Map<String, Object>) map)
84+
.forEach(
85+
(k, v) -> {
86+
MapUtil.buildMap(k, ConstantValue.POINT_SYMBOL, v, data);
87+
});
88+
return new ResponseValue(converter.toInternal(data), requestParam, responseValue);
89+
} else {
90+
return new ResponseValue(converter.toInternal(next), requestParam, responseValue);
91+
}
92+
93+
} else {
94+
LinkedHashMap<String, Object> data =
95+
buildResponseByKey(next, columns, ConstantValue.POINT_SYMBOL);
96+
return new ResponseValue(converter.toInternal(data), requestParam, responseValue);
97+
}
98+
}
99+
100+
@Override
101+
public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) {
102+
this.responseValue = responseValue;
103+
this.requestParam = requestParam;
104+
105+
Map<String, Object> map = gson.fromJson(responseValue, GsonUtil.gsonMapTypeToken);
106+
if (StringUtils.isNotBlank(config.getDataSubject())) {
107+
Object valueByKey =
108+
MapUtil.getValueByKey(
109+
map,
110+
JsonPathUtil.parseJsonPath(config.getDataSubject()),
111+
ConstantValue.POINT_SYMBOL);
112+
if (valueByKey instanceof List) {
113+
this.iterator = ((List) valueByKey).iterator();
114+
} else {
115+
throw new RuntimeException(config.getDataSubject() + " in response is not array");
116+
}
117+
} else {
118+
this.iterator = Lists.newArrayList(map).iterator();
119+
}
120+
}
121+
}

0 commit comments

Comments
 (0)