Skip to content

Commit bf25287

Browse files
committed
Restapi改造
1 parent 69f6861 commit bf25287

File tree

12 files changed

+291
-116
lines changed

12 files changed

+291
-116
lines changed

flinkx-restapi/flinkx-restapi-core/src/main/java/com/dtstack/flinkx/restapi/common/HttpUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public static HttpRequestBase getRequest(String method,
9292
post.setEntity(getEntityData(requestBody));
9393
request = post;
9494
} else {
95-
throw new RuntimeException("Unsupported method:" + method);
95+
throw new UnsupportedOperationException("Unsupported method:" + method);
9696
}
9797

9898
for (Map.Entry<String, String> entry : header.entrySet()) {

flinkx-restapi/flinkx-restapi-core/src/main/java/com/dtstack/flinkx/restapi/common/ParamFactory.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,28 +69,14 @@ public static List<ParamDefinition> createDefinition(ParamType paramType, Map<St
6969
}
7070

7171

72-
private static List<Paramitem> parse(String text, ParamDefinition paramDefinition, RestContext context) {
73-
List<Paramitem> valueItems = new ArrayList<>(12);
74-
Matcher matcher = valueExpression.matcher(text);
75-
String lastVarible = "";
76-
while (matcher.find()) {
77-
78-
String varible = matcher.group("varible");
79-
valueItems.add(ParamParse.parsr(varible));
80-
lastVarible = varible;
81-
}
82-
return valueItems;
83-
}
84-
85-
8672
public static List<Paramitem> getVarible(String text) {
8773
List<Paramitem> valueItems = new ArrayList<>(16);
8874
Matcher matcher = valueExpression.matcher(text);
8975
while (matcher.find()) {
9076
String varible = matcher.group("varible");
91-
valueItems.add( ParamParse.parsr(varible));
77+
valueItems.add(parsr(varible));
9278
}
93-
if(CollectionUtils.isEmpty(valueItems)){
79+
if (CollectionUtils.isEmpty(valueItems)) {
9480
return Collections.emptyList();
9581
}
9682
return valueItems;
@@ -101,4 +87,19 @@ public static boolean isDymatic(String text) {
10187
}
10288

10389

90+
public static Paramitem parsr(String value) {
91+
92+
if (value.startsWith("${") && value.endsWith("}")) {
93+
String substring = value.substring(2, value.length() - 1);
94+
if (substring.startsWith("body.") || substring.startsWith("param.") || substring.startsWith("header.") || substring.startsWith("response.")) {
95+
return new ReplaceParamItem(substring);
96+
} else if (InnerVaribleFactory.isInnerVariable(substring)) {
97+
return InnerVaribleFactory.createInnerVarible(substring);
98+
} else {
99+
return new ConstantVarible(value, value);
100+
}
101+
} else {
102+
return new ConstantVarible(value, value);
103+
}
104+
}
104105
}

flinkx-restapi/flinkx-restapi-core/src/main/java/com/dtstack/flinkx/restapi/common/ParamParse.java

Lines changed: 0 additions & 44 deletions
This file was deleted.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.dtstack.flinkx.restapi.common;
2+
3+
public class exception {
4+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.dtstack.flinkx.restapi.common.handler;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
6+
public abstract class Handler {
7+
8+
private String key;
9+
10+
private Set<String> value;
11+
12+
public Handler(String key, Set<String> value) {
13+
this.key = key;
14+
this.value = value;
15+
}
16+
17+
public boolean isPipei(Map<String, Object> responseData) {
18+
return responseData.containsKey(key) && value.contains(responseData.get(key).toString());
19+
}
20+
21+
public abstract void execute(Map<String, Object> responseData);
22+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.dtstack.flinkx.restapi.common.handler;
2+
3+
public class ReadRecordException extends RuntimeException {
4+
public ReadRecordException() {
5+
}
6+
7+
public ReadRecordException(String message) {
8+
super(message);
9+
}
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.dtstack.flinkx.restapi.common.handler;
2+
3+
public class ResponseRetryException extends RuntimeException {
4+
}

flinkx-restapi/flinkx-restapi-reader/src/main/java/com/dtstack/flinkx/restapi/inputformat/HttpClient.java

Lines changed: 134 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,30 @@
1717
*/
1818
package com.dtstack.flinkx.restapi.inputformat;
1919

20+
import com.dtstack.flinkx.constants.ConstantValue;
21+
import com.dtstack.flinkx.reader.MetaColumn;
22+
import com.dtstack.flinkx.restapi.common.HttpUtil;
2023
import com.dtstack.flinkx.restapi.common.RestContext;
24+
import com.dtstack.flinkx.restapi.common.handler.Handler;
25+
import com.dtstack.flinkx.restapi.common.handler.ReadRecordException;
26+
import com.dtstack.flinkx.restapi.common.handler.ResponseRetryException;
27+
import com.dtstack.flinkx.restapi.common.httprequestApi;
2128
import com.dtstack.flinkx.util.ExceptionUtil;
29+
import org.apache.commons.collections.CollectionUtils;
2230
import org.apache.flink.types.Row;
31+
import org.apache.http.HttpEntity;
32+
import org.apache.http.client.methods.CloseableHttpResponse;
33+
import org.apache.http.client.methods.HttpUriRequest;
34+
import org.apache.http.impl.client.CloseableHttpClient;
35+
import org.apache.http.util.EntityUtils;
2336
import org.slf4j.Logger;
2437
import org.slf4j.LoggerFactory;
2538

26-
import java.util.concurrent.BlockingQueue;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.concurrent.*;
43+
import java.util.concurrent.atomic.AtomicInteger;
2744

2845
/**
2946
* httpClient
@@ -34,20 +51,39 @@
3451
public class HttpClient {
3552
private static final Logger LOG = LoggerFactory.getLogger(HttpClient.class);
3653

37-
private Thread workThread;
54+
private ScheduledExecutorService scheduledExecutorService;
55+
protected transient CloseableHttpClient httpClient;
56+
private final long intervalTime;
3857
private BlockingQueue<Row> queue;
3958
private RestContext restContext;
59+
private AtomicInteger atomicInteger = new AtomicInteger(0);
60+
private static final String THREAD_NAME = "restApiReader-thread";
61+
private List<MetaColumn> metaColumns;
62+
private List<Handler> handlers;
4063

41-
public HttpClient() {
4264

65+
public HttpClient(RestContext restContext, Long intervalTime, List<MetaColumn> metaColumns) {
66+
this.restContext = restContext;
67+
this.intervalTime = intervalTime;
68+
queue = new SynchronousQueue<>(false);
69+
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
70+
@Override
71+
public Thread newThread(Runnable r) {
72+
return new Thread(r, THREAD_NAME);
73+
}
74+
});
75+
this.httpClient = HttpUtil.getHttpClient();
76+
this.metaColumns = metaColumns;
4377
}
4478

4579
public void start() {
46-
this.workThread = new Thread(() -> {
47-
48-
});
4980

50-
workThread.start();
81+
scheduledExecutorService.scheduleAtFixedRate(
82+
this::execute,
83+
0,
84+
intervalTime,
85+
TimeUnit.MILLISECONDS
86+
);
5187
}
5288

5389
public Row takeEvent() {
@@ -60,4 +96,95 @@ public Row takeEvent() {
6096
return row;
6197
}
6298

99+
public void execute() {
100+
int i = atomicInteger.incrementAndGet();
101+
httprequestApi.Httprequest build = restContext.build();
102+
doExecute(build, 2);
103+
System.out.println("第" + i + "次请求值" + build);
104+
restContext.updateValue();
105+
}
106+
107+
public void doExecute(httprequestApi.Httprequest build, int retryTime) {
108+
109+
HttpUriRequest request = HttpUtil.getRequest(restContext.getRequestType(), build.getBody(), build.getHeader(), restContext.getUrl());
110+
try {
111+
CloseableHttpResponse httpResponse = httpClient.execute(request);
112+
HttpEntity entity = httpResponse.getEntity();
113+
if (entity != null) {
114+
String entityData = EntityUtils.toString(entity);
115+
if (restContext.getFormat().equals("json")) {
116+
Map<String, Object> map = HttpUtil.gson.fromJson(entityData, Map.class);
117+
//todo
118+
for (Handler handler : handlers) {
119+
if (handler.isPipei(map)) {
120+
handler.execute(map);
121+
}
122+
}
123+
if (CollectionUtils.isEmpty(metaColumns) || (metaColumns.size() == 1 && metaColumns.get(0).getName().equals(ConstantValue.STAR_SYMBOL))) {
124+
queue.put(Row.of(map));
125+
}else{
126+
HashMap<String, Object> stringObjectHashMap = new HashMap<>();
127+
for (MetaColumn metaColumn : metaColumns) {
128+
String[] names = metaColumn.getName().split("\\.");
129+
Map<String, Object> keyToMap = initData(stringObjectHashMap, names);
130+
Object data = getData(map, names);
131+
keyToMap.put(names[names.length - 1], data);
132+
}
133+
queue.put(Row.of(stringObjectHashMap));
134+
}
135+
} else {
136+
queue.put(Row.of(entityData));
137+
}
138+
} else {
139+
throw new RuntimeException("entity is null");
140+
}
141+
} catch (ResponseRetryException e) {
142+
//todo 重试
143+
if (--retryTime > 0) {
144+
doExecute(build, retryTime);
145+
}
146+
} catch (Exception e) {
147+
//todo 脏数据处理
148+
throw new ReadRecordException("get entity error");
149+
}
150+
151+
}
152+
153+
public void close() {
154+
HttpUtil.closeClient(httpClient);
155+
scheduledExecutorService.shutdown();
156+
}
157+
158+
public Map<String, Object> initData(HashMap<String, Object> data, String[] names) {
159+
HashMap<String, Object> tempHashMap = data;
160+
for (int i = 0; i < names.length; i++) {
161+
if (i != names.length - 1) {
162+
HashMap<String, Object> objectObjectHashMap = new HashMap<String, Object>(4);
163+
tempHashMap.putIfAbsent(names[i], objectObjectHashMap);
164+
tempHashMap = objectObjectHashMap;
165+
} else {
166+
tempHashMap.putIfAbsent(names[i], null);
167+
}
168+
}
169+
return tempHashMap;
170+
}
171+
172+
public Object getData(Map<String, Object> data, String[] names) {
173+
//metaColumns有可能为空 或者 有可能为*
174+
Map<String, Object> tempHashMap = data;
175+
for (int i = 0; i < names.length; i++) {
176+
if (tempHashMap.containsKey(names[i]) && i != names.length - 1) {
177+
if (tempHashMap.get(names[i]) instanceof Map) {
178+
tempHashMap = (Map<String, Object>) tempHashMap.get(names[i]);
179+
} else {
180+
return null;
181+
}
182+
} else if (i == names.length - 1) {
183+
return tempHashMap.get(names[i]);
184+
} else {
185+
return null;
186+
}
187+
}
188+
return null;
189+
}
63190
}

0 commit comments

Comments
 (0)