Skip to content

Commit bb127cd

Browse files
committed
fix: 1. 正确设置响应码,并在响应错误时,记录日志 2. 正确处理请求流复制
1 parent 5fd7d0b commit bb127cd

File tree

5 files changed

+309
-49
lines changed

5 files changed

+309
-49
lines changed

pom.xml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,38 @@
4343
</dependencies>
4444

4545

46+
<build>
47+
<!--配置插件-->
48+
<plugins>
49+
<plugin>
50+
<groupId>org.apache.maven.plugins</groupId>
51+
<artifactId>maven-source-plugin</artifactId>
52+
<version>2.2.1</version>
53+
<executions>
54+
<execution>
55+
<id>attach-sources</id>
56+
<goals>
57+
<goal>jar-no-fork</goal>
58+
</goals>
59+
</execution>
60+
</executions>
61+
</plugin>
62+
<plugin>
63+
<groupId>org.apache.maven.plugins</groupId>
64+
<artifactId>maven-javadoc-plugin</artifactId>
65+
<version>2.9.1</version>
66+
<executions>
67+
<execution>
68+
<id>attach-javadocs</id>
69+
<goals>
70+
<goal>jar</goal>
71+
</goals>
72+
</execution>
73+
</executions>
74+
</plugin>
75+
</plugins>
76+
</build>
77+
78+
79+
4680
</project>

src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java

Lines changed: 53 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -372,22 +372,21 @@ protected String rewriteLocation(Route route, String url, String location) {
372372
return location;
373373
}
374374

375-
protected void doLog(Route route, HttpServerRequest realReq, HttpServerResponse realResp, String realUrl) {
376-
375+
protected void doLog(Route route, HttpServerRequest serverReq, HttpServerResponse serverResp, String proxyUrl) {
377376
if (route.getMetadata(P_LOG) != null && Boolean.parseBoolean(route.getMetadata(P_LOG))) {
378377
String logFormat = route.getMetadata(P_LOG_FORMAT).toString();
379378
if (logFormat == null || logFormat.isEmpty()) {
380379
logFormat = LOG_FORMAT_DEFAULT;
381380
}
382381
String logInfo = logFormat
383382
.replace("{name}", route.getName())
384-
.replace("{method}", realReq.method().toString())
385-
.replace("{userAgent}", realReq.getHeader("User-Agent"))
386-
.replace("{remoteAddr}", realReq.remoteAddress().hostAddress())
387-
.replace("{remotePort}", String.valueOf(realReq.remoteAddress().port()))
388-
.replace("{source}", realReq.uri())
389-
.replace("{target}", realUrl)
390-
.replace("{statusCode}", String.valueOf(realResp.getStatusCode()))
383+
.replace("{method}", serverReq.method().toString())
384+
.replace("{userAgent}", serverReq.getHeader("User-Agent"))
385+
.replace("{remoteAddr}", serverReq.remoteAddress().hostAddress())
386+
.replace("{remotePort}", String.valueOf(serverReq.remoteAddress().port()))
387+
.replace("{source}", serverReq.uri())
388+
.replace("{target}", proxyUrl)
389+
.replace("{statusCode}", String.valueOf(serverResp.getStatusCode()))
391390
.replace("{consumedMills}", String.valueOf(System.currentTimeMillis() - (Long) route.getMetadata(P_SEND_TIMESTAMP)));
392391
log.info(logInfo);
393392
}
@@ -396,83 +395,97 @@ protected void doLog(Route route, HttpServerRequest realReq, HttpServerResponse
396395
/**
397396
* 发起请求Handler
398397
*/
399-
protected Handler<AsyncResult<HttpClientResponse>> sendRequestHandler(Route route, HttpServerRequest realReq, HttpServerResponse realResp, String realUrl) {
398+
protected Handler<AsyncResult<HttpClientResponse>> sendRequestHandler(Route route, HttpServerRequest serverReq, HttpServerResponse serverResp, String proxyUrl) {
400399
return ar -> {
401400
if (ar.succeeded()) {
402-
HttpClientResponse proxyResp = ar.result();
401+
HttpClientResponse clientResp = ar.result();
402+
// 暂停流读取
403+
clientResp.pause();
403404
// 复制响应头。复制的过程中忽略逐跳标头
404-
copyResponseHeaders(route, realReq, realResp, proxyResp);
405-
if (!realResp.headers().contains("Content-Length")) {
406-
realResp.setChunked(true);
405+
copyResponseHeaders(route, serverReq, serverResp, clientResp);
406+
if (!serverResp.headers().contains("Content-Length")) {
407+
serverResp.setChunked(true);
407408
}
408409
// 设置响应码
409-
realResp.setStatusCode(proxyResp.statusCode());
410+
serverResp.setStatusCode(clientResp.statusCode());
410411
// 流输出
411-
proxyResp.pipeTo(realResp).onSuccess(v -> {
412-
doLog(route, realReq, realResp, realUrl);
412+
clientResp.pipeTo(serverResp).onSuccess(v -> {
413+
doLog(route, serverReq, serverResp, proxyUrl);
413414
}).onFailure(e -> {
414-
realResp.setStatusCode(502);
415-
realResp.end("Bad Gateway");
416-
log.error("{} {} proxy response copy error", realReq.method().name(), realUrl, e);
415+
badGateway(route, serverReq, serverResp, proxyUrl);
416+
log.error("{} {} proxy response copy error", serverReq.method().name(), proxyUrl, e);
417417
});
418-
419418
} else {
419+
badGateway(route, serverReq, serverResp, proxyUrl);
420420
Throwable e = ar.cause();
421-
realResp.setStatusCode(502);
422-
realResp.end("Bad Gateway");
423-
log.error("{} {} send request error", realReq.method().name(), realUrl, e);
421+
log.error("{} {} send request error", serverReq.method().name(), proxyUrl, e);
424422
}
425423
};
426424
}
427425

428426
/**
429427
* 建立连接Handler
430428
*/
431-
protected Handler<AsyncResult<HttpClientRequest>> connectHandler(Route route, HttpServerRequest realReq, HttpServerResponse realResp, String realUrl) {
429+
protected Handler<AsyncResult<HttpClientRequest>> connectHandler(Route route, HttpServerRequest serverReq, HttpServerResponse serverResp, String proxyUrl) {
432430
return ar -> {
433431
if (ar.succeeded()) {
434-
HttpClientRequest proxyReq = ar.result();
432+
HttpClientRequest clientReq = ar.result();
435433
// 复制请求头。复制的过程中忽略逐跳标头
436-
copyRequestHeaders(route, realReq, proxyReq);
434+
copyRequestHeaders(route, serverReq, clientReq);
437435
// 若存在请求体,则将请求体复制。使用流式复制,避免占用大量内存
438-
if (proxyReq.headers().contains("Content-Length") || proxyReq.headers().contains("Transfer-Encoding")) {
439-
realReq.pipeTo(proxyReq);
436+
if (clientReq.headers().contains("Content-Length") || clientReq.headers().contains("Transfer-Encoding")) {
437+
clientReq.send(serverReq).onComplete(sendRequestHandler(route, serverReq, serverResp, proxyUrl));
438+
} else {
439+
clientReq.send().onComplete(sendRequestHandler(route, serverReq, serverResp, proxyUrl));
440440
}
441-
// 发送请求
442-
route.putMetadata(P_SEND_TIMESTAMP, System.currentTimeMillis());
443-
proxyReq.send().onComplete(sendRequestHandler(route, realReq, realResp, realUrl));
444441
} else {
442+
badGateway(route, serverReq, serverResp, proxyUrl);
445443
Throwable e = ar.cause();
446-
log.error("{} {} open connection error", realReq.method().name(), realUrl, e);
444+
log.error("{} {} open connection error", serverReq.method().name(), proxyUrl, e);
447445
}
448446

449447
};
450448
}
451449

450+
private void badGateway(Route route, HttpServerRequest serverReq, HttpServerResponse serverResp, String proxyUrl) {
451+
if (!serverResp.ended()) {
452+
serverResp.setStatusCode(502).end("Bad Gateway");
453+
}
454+
doLog(route, serverReq, serverResp, proxyUrl);
455+
}
456+
452457
/**
453458
* 路由处理Handler
454459
*/
455460
protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
456461
return ctx -> {
457462
// vertx的uri()是包含query参数的。而path()才是我们常说的不带有query的uri
458463
Route route = ctx.currentRoute();
464+
465+
// 记录请求开始时间
466+
route.putMetadata(P_SEND_TIMESTAMP, System.currentTimeMillis());
467+
459468
String result = route.getMetadata(P_TARGET_URL).toString();
460-
HttpServerRequest realReq = ctx.request();
461-
HttpServerResponse realResp = ctx.response();
462-
String absoluteURI = realReq.absoluteURI();
469+
HttpServerRequest serverReq = ctx.request();
470+
HttpServerResponse serverResp = ctx.response();
471+
472+
// 暂停流读取
473+
serverReq.pause();
474+
475+
String absoluteURI = serverReq.absoluteURI();
463476
UrlParser.ParsedUrl parsedUrl = UrlParser.parseUrl(absoluteURI);
464477
String prefix = parsedUrl.getFormatHostPort() + (route.getMetadata(P_SOURCE_URL).toString().replace("/*", ""));
465-
String realUrl = result + (parsedUrl.getFormatUrl().replace(prefix, ""));
478+
String proxyUrl = result + (parsedUrl.getFormatUrl().replace(prefix, ""));
466479

467480

468481
// 构建请求参数
469482
RequestOptions requestOptions = new RequestOptions();
470-
requestOptions.setAbsoluteURI(realUrl);
471-
requestOptions.setMethod(realReq.method());
483+
requestOptions.setAbsoluteURI(proxyUrl);
484+
requestOptions.setMethod(serverReq.method());
472485
requestOptions.setFollowRedirects(route.getMetadata(P_FOLLOW_REDIRECTS) != null && Boolean.parseBoolean(route.getMetadata(P_FOLLOW_REDIRECTS)));
473486

474487
// 请求
475-
httpClient.request(requestOptions).onComplete(connectHandler(route, realReq, realResp, realUrl));
488+
httpClient.request(requestOptions).onComplete(connectHandler(route, serverReq, serverResp, proxyUrl));
476489
};
477490
}
478491

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package top.meethigher;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.http.HttpClient;
5+
import io.vertx.core.http.HttpClientRequest;
6+
import io.vertx.core.http.HttpMethod;
7+
import io.vertx.core.http.RequestOptions;
8+
import org.junit.Before;
9+
import org.junit.Test;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import top.meethigher.proxy.http.ProxyRoute;
13+
import top.meethigher.proxy.http.ReverseHttpProxy;
14+
15+
import java.time.Duration;
16+
import java.util.concurrent.TimeUnit;
17+
18+
public class ReverseHttpProxyBugTest {
19+
20+
private static final Logger log = LoggerFactory.getLogger(ReverseHttpProxyBugTest.class);
21+
22+
private static final Vertx vertx = Vertx.vertx();
23+
24+
/**
25+
* 前置环境。
26+
* 启动一个10秒响应的backend
27+
* 启动一个反代backend的proxyserver
28+
*/
29+
@Before
30+
public void server() {
31+
32+
int backendServerPort = 888;
33+
int proxyServerPort = 8080;
34+
35+
// backend-server
36+
vertx.createHttpServer().requestHandler(req -> {
37+
vertx.setTimer(Duration.ofSeconds(10).toMillis(), id -> {
38+
req.response()
39+
.setStatusCode(200)
40+
.putHeader("Content-Type", "text/plain;charset=utf-8")
41+
.end("Hello World");
42+
});
43+
}).listen(backendServerPort, ar -> {
44+
if (ar.succeeded()) {
45+
log.info("backend started on port {}", backendServerPort);
46+
} else {
47+
log.error("backend start error", ar.cause());
48+
}
49+
});
50+
51+
// proxy-server
52+
ReverseHttpProxy.create(vertx).port(proxyServerPort).addRoute(
53+
new ProxyRoute()
54+
.setName("proxy")
55+
.setSourceUrl("/*")
56+
.setTargetUrl("http://127.0.0.1:" + backendServerPort)
57+
).start();
58+
59+
}
60+
61+
62+
/**
63+
* 客户端主动发起请求
64+
* 客户端--->proxy--->backend
65+
* <p>
66+
* 此时,客户端主动关闭,当proxy将backend的响应写回至客户端时,会发现连接已经断开了。
67+
* 客户端<-x-proxy<--backend
68+
* <p>
69+
* 此时就会报错: Response has already been written
70+
*/
71+
@Test
72+
public void reqClose() throws Exception {
73+
74+
HttpClient httpClient = vertx.createHttpClient();
75+
httpClient.request(new RequestOptions().setMethod(HttpMethod.GET).setAbsoluteURI("http://127.0.0.1:8080/bug-test"), ar -> {
76+
if (ar.succeeded()) {
77+
HttpClientRequest req = ar.result();
78+
79+
// 模拟浏览器断开
80+
vertx.setTimer(Duration.ofSeconds(2).toMillis(), id -> {
81+
req.connection().close();
82+
});
83+
84+
req.send().onFailure(e -> {
85+
log.error("send error", e);
86+
}).onSuccess(resp -> {
87+
log.info("statusCode: {}", resp.statusCode());
88+
});
89+
} else {
90+
Throwable cause = ar.cause();
91+
log.error("request error", cause, cause);
92+
}
93+
});
94+
95+
TimeUnit.SECONDS.sleep(20);
96+
}
97+
}

src/test/java/top/meethigher/ReverseHttpProxyTest.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,61 @@
11
package top.meethigher;
22

33
import io.vertx.core.Vertx;
4+
import io.vertx.core.http.PoolOptions;
5+
import io.vertx.ext.web.Router;
46
import org.junit.Test;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
59
import top.meethigher.proxy.http.ProxyRoute;
610
import top.meethigher.proxy.http.ReverseHttpProxy;
711

812
import java.util.concurrent.TimeUnit;
913

1014
public class ReverseHttpProxyTest {
1115

16+
17+
private static final Logger log = LoggerFactory.getLogger(ReverseHttpProxyTest.class);
18+
19+
private static final Vertx vertx = Vertx.vertx();
20+
21+
/**
22+
* 用于测试一系列常规的HTTP反代请求
23+
* 可以配置batch-curl.sh进行使用
24+
*/
1225
@Test
13-
public void testVertxHTTPReverseProxyTest() throws Exception {
14-
Vertx vertx = Vertx.vertx();
26+
public void testAllMethod() throws Exception {
1527
ReverseHttpProxy proxy = ReverseHttpProxy.create(
1628
vertx
1729
);
1830
proxy.port(8080);
1931
proxy.start();
2032
ProxyRoute proxyRoute = new ProxyRoute();
21-
proxyRoute.setSourceUrl("/test/*");
22-
proxyRoute.setTargetUrl("https://meethigher.top");
33+
proxyRoute.setSourceUrl("/*");
34+
proxyRoute.setTargetUrl("https://reqres.in");
2335
proxy.addRoute(proxyRoute);
24-
TimeUnit.MINUTES.sleep(2);
36+
TimeUnit.HOURS.sleep(1);
2537
proxy.stop();
2638
}
2739

2840

41+
/**
42+
* 用于测试省略端口时的反代场景
43+
*/
2944
@Test
3045
public void testDomain80() throws Exception {
31-
Vertx vertx = Vertx.vertx();
3246
ReverseHttpProxy proxy = ReverseHttpProxy.create(
33-
vertx
47+
Router.router(vertx),
48+
vertx.createHttpServer(),
49+
vertx.createHttpClient(new PoolOptions().setHttp1MaxSize(2000).setHttp2MaxSize(2000))
3450
);
3551
proxy.port(80);
3652
proxy.start();
3753
ProxyRoute proxyRoute = new ProxyRoute();
3854
proxyRoute.setSourceUrl("/*");
39-
proxyRoute.setTargetUrl("http://127.0.0.1:4000");
55+
proxyRoute.setTargetUrl("https://webst01.is.autonavi.com");
56+
proxyRoute.setHttpKeepAlive(false);
4057
proxy.addRoute(proxyRoute);
41-
TimeUnit.MINUTES.sleep(2);
58+
TimeUnit.HOURS.sleep(1);
4259
proxy.stop();
4360
}
4461

0 commit comments

Comments
 (0)