Skip to content

Commit 9ff14bf

Browse files
committed
fix: 添加一次代理请求中两条连接的异常关闭监听,合理释放资源
1 parent bb127cd commit 9ff14bf

File tree

5 files changed

+83
-31
lines changed

5 files changed

+83
-31
lines changed

src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.vertx.core.*;
44
import io.vertx.core.http.*;
55
import io.vertx.core.json.JsonObject;
6+
import io.vertx.core.net.SocketAddress;
67
import io.vertx.ext.web.Route;
78
import io.vertx.ext.web.Router;
89
import io.vertx.ext.web.RoutingContext;
@@ -85,7 +86,17 @@ public class ReverseHttpProxy {
8586
/**
8687
* 请求发送的毫秒时间戳
8788
*/
88-
public static final String P_SEND_TIMESTAMP = "send.timestamp";
89+
protected static final String INTERNAL_SEND_TIMESTAMP = "internal.send.timestamp";
90+
91+
/**
92+
* 连接状态:客户端--代理服务
93+
*/
94+
protected static final String INTERNAL_CLIENT_CONNECTION_OPEN = "internal.client.connection.open";
95+
96+
/**
97+
* 连接状态:代理服务--后端服务
98+
*/
99+
protected static final String INTERNAL_PROXY_SERVER_CONNECTION_OPEN = "internal.client.proxyServer.connection.open";
89100

90101

91102
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
@@ -152,12 +163,12 @@ public static ReverseHttpProxy create(Router router, HttpServer httpServer, Http
152163
}
153164

154165
protected static String generateName() {
155-
final String prefix = "VertxHTTPReverseProxy-";
166+
final String prefix = "ReverseHttpProxy-";
156167
try {
157168
// 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠
158169
synchronized (System.getProperties()) {
159-
final String next = String.valueOf(Integer.getInteger("top.meethigher.proxy.http.VertxHTTPReverseProxy.name", 0) + 1);
160-
System.setProperty("top.meethigher.proxy.http.VertxHTTPReverseProxy.name", next);
170+
final String next = String.valueOf(Integer.getInteger("top.meethigher.proxy.http.ReverseHttpProxy.name", 0) + 1);
171+
System.setProperty("top.meethigher.proxy.http.ReverseHttpProxy.name", next);
161172
return prefix + next;
162173
}
163174
} catch (Exception e) {
@@ -387,7 +398,7 @@ protected void doLog(Route route, HttpServerRequest serverReq, HttpServerRespons
387398
.replace("{source}", serverReq.uri())
388399
.replace("{target}", proxyUrl)
389400
.replace("{statusCode}", String.valueOf(serverResp.getStatusCode()))
390-
.replace("{consumedMills}", String.valueOf(System.currentTimeMillis() - (Long) route.getMetadata(P_SEND_TIMESTAMP)));
401+
.replace("{consumedMills}", String.valueOf(System.currentTimeMillis() - (Long) route.getMetadata(INTERNAL_SEND_TIMESTAMP)));
391402
log.info(logInfo);
392403
}
393404
}
@@ -408,13 +419,16 @@ protected Handler<AsyncResult<HttpClientResponse>> sendRequestHandler(Route rout
408419
}
409420
// 设置响应码
410421
serverResp.setStatusCode(clientResp.statusCode());
411-
// 流输出
412-
clientResp.pipeTo(serverResp).onSuccess(v -> {
413-
doLog(route, serverReq, serverResp, proxyUrl);
414-
}).onFailure(e -> {
415-
badGateway(route, serverReq, serverResp, proxyUrl);
416-
log.error("{} {} proxy response copy error", serverReq.method().name(), proxyUrl, e);
417-
});
422+
if ((boolean) route.getMetadata(INTERNAL_PROXY_SERVER_CONNECTION_OPEN) && (boolean) route.getMetadata(INTERNAL_CLIENT_CONNECTION_OPEN)) {
423+
// 流输出
424+
clientResp.pipeTo(serverResp).onSuccess(v -> {
425+
doLog(route, serverReq, serverResp, proxyUrl);
426+
}).onFailure(e -> {
427+
badGateway(route, serverReq, serverResp, proxyUrl);
428+
log.error("{} {} proxy response copy error", serverReq.method().name(), proxyUrl, e);
429+
});
430+
}
431+
418432
} else {
419433
badGateway(route, serverReq, serverResp, proxyUrl);
420434
Throwable e = ar.cause();
@@ -430,13 +444,33 @@ protected Handler<AsyncResult<HttpClientRequest>> connectHandler(Route route, Ht
430444
return ar -> {
431445
if (ar.succeeded()) {
432446
HttpClientRequest clientReq = ar.result();
447+
// 记录连接状态
448+
route.putMetadata(INTERNAL_PROXY_SERVER_CONNECTION_OPEN, true);
449+
450+
// 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
451+
HttpConnection connection = clientReq.connection();
452+
SocketAddress remoteAddress = connection.remoteAddress();
453+
SocketAddress localAddress = connection.localAddress();
454+
connection.closeHandler(v -> {
455+
route.putMetadata(INTERNAL_PROXY_SERVER_CONNECTION_OPEN, false);
456+
log.debug("proxyServer connection {}:{} -- {}:{} closed",
457+
localAddress.hostAddress(), localAddress.port(),
458+
remoteAddress.hostAddress(), remoteAddress.port());
459+
});
460+
433461
// 复制请求头。复制的过程中忽略逐跳标头
434462
copyRequestHeaders(route, serverReq, clientReq);
435-
// 若存在请求体,则将请求体复制。使用流式复制,避免占用大量内存
436-
if (clientReq.headers().contains("Content-Length") || clientReq.headers().contains("Transfer-Encoding")) {
437-
clientReq.send(serverReq).onComplete(sendRequestHandler(route, serverReq, serverResp, proxyUrl));
438-
} else {
439-
clientReq.send().onComplete(sendRequestHandler(route, serverReq, serverResp, proxyUrl));
463+
464+
if ((boolean) route.getMetadata(INTERNAL_PROXY_SERVER_CONNECTION_OPEN) && (boolean) route.getMetadata(INTERNAL_CLIENT_CONNECTION_OPEN)) {
465+
// 若存在请求体,则将请求体复制。使用流式复制,避免占用大量内存
466+
if (clientReq.headers().contains("Content-Length") || clientReq.headers().contains("Transfer-Encoding")) {
467+
clientReq.send(serverReq).onComplete(sendRequestHandler(route, serverReq, serverResp, proxyUrl));
468+
} else {
469+
clientReq.send().onComplete(sendRequestHandler(route, serverReq, serverResp, proxyUrl));
470+
}
471+
} else if ((boolean) route.getMetadata(INTERNAL_PROXY_SERVER_CONNECTION_OPEN) && !(boolean) route.getMetadata(INTERNAL_CLIENT_CONNECTION_OPEN)) {
472+
// 整体链路连接不可用,释放资源
473+
connection.close();
440474
}
441475
} else {
442476
badGateway(route, serverReq, serverResp, proxyUrl);
@@ -463,7 +497,9 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
463497
Route route = ctx.currentRoute();
464498

465499
// 记录请求开始时间
466-
route.putMetadata(P_SEND_TIMESTAMP, System.currentTimeMillis());
500+
route.putMetadata(INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis());
501+
// 记录连接状态
502+
route.putMetadata(INTERNAL_CLIENT_CONNECTION_OPEN, true);
467503

468504
String result = route.getMetadata(P_TARGET_URL).toString();
469505
HttpServerRequest serverReq = ctx.request();
@@ -472,20 +508,34 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
472508
// 暂停流读取
473509
serverReq.pause();
474510

511+
475512
String absoluteURI = serverReq.absoluteURI();
476513
UrlParser.ParsedUrl parsedUrl = UrlParser.parseUrl(absoluteURI);
477514
String prefix = parsedUrl.getFormatHostPort() + (route.getMetadata(P_SOURCE_URL).toString().replace("/*", ""));
478515
String proxyUrl = result + (parsedUrl.getFormatUrl().replace(prefix, ""));
479516

480-
481517
// 构建请求参数
482518
RequestOptions requestOptions = new RequestOptions();
483519
requestOptions.setAbsoluteURI(proxyUrl);
484520
requestOptions.setMethod(serverReq.method());
485521
requestOptions.setFollowRedirects(route.getMetadata(P_FOLLOW_REDIRECTS) != null && Boolean.parseBoolean(route.getMetadata(P_FOLLOW_REDIRECTS)));
486522

523+
524+
// 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
525+
HttpConnection connection = serverReq.connection();
526+
SocketAddress remoteAddress = connection.remoteAddress();
527+
SocketAddress localAddress = connection.localAddress();
528+
connection.closeHandler(v -> {
529+
route.putMetadata(INTERNAL_CLIENT_CONNECTION_OPEN, false);
530+
log.debug("client connection {}:{} -- {}:{} closed",
531+
remoteAddress.hostAddress(), remoteAddress.port(),
532+
localAddress.hostAddress(), localAddress.port());
533+
});
534+
487535
// 请求
488-
httpClient.request(requestOptions).onComplete(connectHandler(route, serverReq, serverResp, proxyUrl));
536+
if ((boolean) route.getMetadata(INTERNAL_CLIENT_CONNECTION_OPEN)) {
537+
httpClient.request(requestOptions).onComplete(connectHandler(route, serverReq, serverResp, proxyUrl));
538+
}
489539
};
490540
}
491541

src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,12 @@ public ReverseTcpProxy host(String host) {
9393

9494

9595
private static String generateName() {
96-
final String prefix = "VertxTCPReverseProxy-";
96+
final String prefix = "ReverseTcpProxy-";
9797
try {
9898
// 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠
9999
synchronized (System.getProperties()) {
100-
final String next = String.valueOf(Integer.getInteger("top.meethigher.proxy.tcp.VertxTCPReverseProxy.name", 0) + 1);
101-
System.setProperty("top.meethigher.proxy.tcp.VertxTCPReverseProxy.name", next);
100+
final String next = String.valueOf(Integer.getInteger("top.meethigher.proxy.tcp.ReverseTcpProxy.name", 0) + 1);
101+
System.setProperty("top.meethigher.proxy.tcp.ReverseTcpProxy.name", next);
102102
return prefix + next;
103103
}
104104
} catch (Exception e) {

src/main/java/top/meethigher/proxy/tcp/SimpleReverseTcpProxy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,12 @@ public void stop() {
205205
}
206206

207207
private static String generateName() {
208-
final String prefix = "TCPReverseProxy-";
208+
final String prefix = "SimpleReverseTcpProxy-";
209209
try {
210210
// 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠
211211
synchronized (System.getProperties()) {
212-
final String next = String.valueOf(Integer.getInteger("top.meethigher.proxy.tcp.TcpReverseProxy.name", 0) + 1);
213-
System.setProperty("top.meethigher.proxy.tcp.TcpReverseProxy.name", next);
212+
final String next = String.valueOf(Integer.getInteger("top.meethigher.proxy.tcp.SimpleReverseTcpProxy.name", 0) + 1);
213+
System.setProperty("top.meethigher.proxy.tcp.SimpleReverseTcpProxy.name", next);
214214
return prefix + next;
215215
}
216216
} catch (Exception e) {

src/test/java/top/meethigher/ReverseHttpProxyBugTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package top.meethigher;
22

33
import io.vertx.core.Vertx;
4-
import io.vertx.core.http.HttpClient;
5-
import io.vertx.core.http.HttpClientRequest;
6-
import io.vertx.core.http.HttpMethod;
7-
import io.vertx.core.http.RequestOptions;
4+
import io.vertx.core.http.*;
5+
import io.vertx.ext.web.Router;
86
import org.junit.Before;
97
import org.junit.Test;
108
import org.slf4j.Logger;
@@ -49,7 +47,10 @@ public void server() {
4947
});
5048

5149
// proxy-server
52-
ReverseHttpProxy.create(vertx).port(proxyServerPort).addRoute(
50+
Router router = Router.router(vertx);
51+
HttpServer httpServer = vertx.createHttpServer();
52+
HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions(), new PoolOptions());
53+
ReverseHttpProxy.create(router, httpServer, httpClient).port(proxyServerPort).addRoute(
5354
new ProxyRoute()
5455
.setName("proxy")
5556
.setSourceUrl("/*")

src/test/java/top/meethigher/ReverseHttpProxyTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public void testAllMethod() throws Exception {
3030
proxy.port(8080);
3131
proxy.start();
3232
ProxyRoute proxyRoute = new ProxyRoute();
33+
proxyRoute.setHttpKeepAlive(false);
3334
proxyRoute.setSourceUrl("/*");
3435
proxyRoute.setTargetUrl("https://reqres.in");
3536
proxy.addRoute(proxyRoute);

0 commit comments

Comments
 (0)