Skip to content

Commit e3b06c7

Browse files
committed
fix: 初次解决转发短连接时的问题
1 parent 1015624 commit e3b06c7

File tree

4 files changed

+100
-101
lines changed

4 files changed

+100
-101
lines changed

src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ public class ReverseHttpProxy {
3535
*/
3636
public static final String LOG_FORMAT_DEFAULT = "" +
3737
"{name} -- " +
38+
"{method} -- " +
3839
"{serverHttpVersion} -- " +
3940
"{clientHttpVersion} -- " +
40-
"{method} -- " +
4141
"{userAgent} -- " +
4242
"{serverRemoteAddr} -- " +
43+
"{serverLocalAddr} -- " +
4344
"{clientLocalAddr} -- " +
45+
"{clientRemoteAddr} -- " +
4446
"{sourceUri} -- " +
4547
"{proxyUrl} -- " +
4648
"{statusCode} -- " +
@@ -113,12 +115,12 @@ public class ReverseHttpProxy {
113115
/**
114116
* 连接状态:客户端--代理服务
115117
*/
116-
protected static final String INTERNAL_CLIENT_CONNECTION_OPEN = "INTERNAL_CLIENT_CONNECTION_OPEN";
118+
protected static final String INTERNAL_SERVER_CONNECTION_OPEN = "INTERNAL_SERVER_CONNECTION_OPEN";
117119

118120
/**
119121
* 连接状态:代理服务--后端服务
120122
*/
121-
protected static final String INTERNAL_PROXY_SERVER_CONNECTION_OPEN = "INTERNAL_PROXY_SERVER_CONNECTION_OPEN";
123+
protected static final String INTERNAL_CLIENT_CONNECTION_OPEN = "INTERNAL_CLIENT_CONNECTION_OPEN";
122124

123125
/**
124126
* 代理服务接收请求的HTTP版本
@@ -146,12 +148,21 @@ public class ReverseHttpProxy {
146148
*/
147149
protected static final String INTERNAL_SERVER_REMOTE_ADDR = "INTERNAL_SERVER_REMOTE_ADDR";
148150

151+
/**
152+
* 代理服务收到的请求的本地地址
153+
*/
154+
protected static final String INTERNAL_SERVER_LOCAL_ADDR = "INTERNAL_SERVER_LOCAL_ADDR";
149155

150156
/**
151157
* 代理服务发起请求的本端地址
152158
*/
153159
protected static final String INTERNAL_CLIENT_LOCAL_ADDR = "INTERNAL_CLIENT_LOCAL_ADDR";
154160

161+
/**
162+
* 代理服务发起请求的远端地址
163+
*/
164+
protected static final String INTERNAL_CLIENT_REMOTE_ADDR = "INTERNAL_CLIENT_REMOTE_ADDR";
165+
155166

156167
/**
157168
* 代理服务收到的请求路径
@@ -300,15 +311,17 @@ protected void doLog(RoutingContext ctx) {
300311
}
301312
String logInfo = logFormat
302313
.replace("{name}", getContextData(ctx, P_NAME).toString())
303-
.replace("{serverHttpVersion}", getContextData(ctx, INTERNAL_SERVER_HTTP_VERSION).toString())
304-
.replace("{clientHttpVersion}", getContextData(ctx, INTERNAL_CLIENT_HTTP_VERSION).toString())
305314
.replace("{method}", getContextData(ctx, INTERNAL_METHOD).toString())
306315
.replace("{userAgent}", getContextData(ctx, INTERNAL_USER_AGENT).toString())
307-
.replace("{serverRemoteAddr}", getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR).toString())
308-
.replace("{clientLocalAddr}", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR).toString())
309316
.replace("{sourceUri}", getContextData(ctx, INTERNAL_SOURCE_URI).toString())
310317
.replace("{proxyUrl}", getContextData(ctx, INTERNAL_PROXY_URL).toString())
311318
.replace("{statusCode}", getContextData(ctx, INTERNAL_STATUS_CODE).toString())
319+
.replace("{serverHttpVersion}", getContextData(ctx, INTERNAL_SERVER_HTTP_VERSION).toString())
320+
.replace("{clientHttpVersion}", getContextData(ctx, INTERNAL_CLIENT_HTTP_VERSION).toString())
321+
.replace("{serverRemoteAddr}", getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR).toString())
322+
.replace("{serverLocalAddr}", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR).toString())
323+
.replace("{clientLocalAddr}", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR).toString())
324+
.replace("{clientRemoteAddr}", getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR).toString())
312325
.replace("{consumedMills}", String.valueOf(System.currentTimeMillis() - (Long) getContextData(ctx, INTERNAL_SEND_TIMESTAMP)));
313326
log.info(logInfo);
314327
}
@@ -333,8 +346,7 @@ public void start() {
333346
if (ar.succeeded()) {
334347
log.info("{} started on {}:{}", name, sourceHost, sourcePort);
335348
} else {
336-
Throwable e = ar.cause();
337-
log.error("{} start failed", name, e);
349+
log.error("{} start failed", name, ar.cause());
338350

339351
}
340352
};
@@ -576,20 +588,26 @@ protected Handler<AsyncResult<HttpClientResponse>> sendRequestHandler(RoutingCon
576588
// 设置响应码
577589
setStatusCode(ctx, serverResp, clientResp.statusCode());
578590

579-
if ((boolean) getContextData(ctx, INTERNAL_PROXY_SERVER_CONNECTION_OPEN) && (boolean) getContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN)) {
591+
if ((boolean) getContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN) && (boolean) getContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN)) {
580592
// 流输出
581-
clientResp.pipeTo(serverResp).onSuccess(v -> {
582-
doLog(ctx);
583-
}).onFailure(e -> {
584-
badGateway(ctx, serverResp);
585-
log.error("{} {} clientResp pipeto serverResp error", serverReq.method().name(), proxyUrl, e);
593+
clientResp.pipeTo(serverResp).onComplete(ar1 -> {
594+
if (ar1.succeeded()) {
595+
doLog(ctx);
596+
} else {
597+
badGateway(ctx, serverResp);
598+
log.error("pipeTo failed. {} <-- {} <-- {} <-- {}",
599+
getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR),
600+
getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR),
601+
getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR),
602+
getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR),
603+
ar1.cause());
604+
}
586605
});
587606
}
588607

589608
} else {
590609
badGateway(ctx, serverResp);
591-
Throwable e = ar.cause();
592-
log.error("{} {} send request error", serverReq.method().name(), proxyUrl, e);
610+
log.error("{} {} send request error", serverReq.method().name(), proxyUrl, ar.cause());
593611
}
594612
};
595613
}
@@ -609,35 +627,34 @@ protected Handler<AsyncResult<HttpClientRequest>> connectHandler(RoutingContext
609627
HttpClientRequest clientReq = ar.result();
610628
setContextData(ctx, INTERNAL_CLIENT_HTTP_VERSION, clientReq.version().alpnName());
611629
// 记录连接状态
612-
setContextData(ctx, INTERNAL_PROXY_SERVER_CONNECTION_OPEN, true);
630+
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, true);
613631

614632
// 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
615633
setContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR, clientReq.connection().localAddress().toString());
634+
setContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR, clientReq.connection().remoteAddress().toString());
635+
log.debug("{} --> {} connected", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
636+
637+
616638
clientReq.connection().closeHandler(v -> {
617-
setContextData(ctx, INTERNAL_PROXY_SERVER_CONNECTION_OPEN, false);
618-
log.debug("proxyClient local connection {} closed",
619-
getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR).toString());
639+
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, false);
640+
log.debug("{} --> {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
620641
});
621642

622643

623644
// 复制请求头。复制的过程中忽略逐跳标头
624645
copyRequestHeaders(ctx, serverReq, clientReq);
625646

626-
if ((boolean) getContextData(ctx, INTERNAL_PROXY_SERVER_CONNECTION_OPEN) && (boolean) getContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN)) {
647+
if ((boolean) getContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN) && (boolean) getContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN)) {
627648
// 若存在请求体,则将请求体复制。使用流式复制,避免占用大量内存
628649
if (clientReq.headers().contains("Content-Length") || clientReq.headers().contains("Transfer-Encoding")) {
629650
clientReq.send(serverReq).onComplete(sendRequestHandler(ctx, serverReq, serverResp, proxyUrl));
630651
} else {
631652
clientReq.send().onComplete(sendRequestHandler(ctx, serverReq, serverResp, proxyUrl));
632653
}
633-
} else if ((boolean) getContextData(ctx, INTERNAL_PROXY_SERVER_CONNECTION_OPEN) && !(boolean) getContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN)) {
634-
// 整体链路连接不可用,释放资源
635-
clientReq.connection().close();
636654
}
637655
} else {
638656
badGateway(ctx, serverResp);
639-
Throwable e = ar.cause();
640-
log.error("{} {} open connection error", serverReq.method().name(), proxyUrl, e);
657+
log.error("{} {} open connection error", serverReq.method().name(), proxyUrl, ar.cause());
641658
}
642659

643660
};
@@ -659,6 +676,9 @@ protected void badGateway(RoutingContext ctx, HttpServerResponse serverResp) {
659676
*/
660677
protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
661678
return ctx -> {
679+
// 暂停流读取
680+
ctx.request().pause();
681+
662682
// vertx的uri()是包含query参数的。而path()才是我们常说的不带有query的uri
663683
// route不是线程安全的。route里的metadata应以路由为单元存储,而不是以请求为单元存储。一个路由会有很多请求。
664684
// 若想要以请求为单元存储数据,应该使用routingContext.put
@@ -670,11 +690,7 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
670690
// 记录请求开始时间
671691
setContextData(ctx, INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis());
672692
// 记录连接状态
673-
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, true);
674-
675-
// 暂停流读取
676-
ctx.request().pause();
677-
693+
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, true);
678694

679695
// 获取代理地址
680696
String proxyUrl = getProxyUrl(ctx, ctx.request(), ctx.response());
@@ -691,13 +707,15 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
691707
requestOptions.setMethod(ctx.request().method());
692708
requestOptions.setFollowRedirects(getContextData(ctx, P_FOLLOW_REDIRECTS) != null && Boolean.parseBoolean(getContextData(ctx, P_FOLLOW_REDIRECTS).toString()));
693709

694-
695710
// 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
696711
setContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR, ctx.request().connection().remoteAddress().toString());
712+
setContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR, ctx.request().connection().localAddress().toString());
713+
714+
log.debug("{} <-- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
697715

698716
ctx.request().connection().closeHandler(v -> {
699-
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, false);
700-
log.debug("proxyServer remote connection {} closed", getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR).toString());
717+
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, false);
718+
log.debug("{} <-- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
701719
});
702720

703721
// 如果跨域由代理服务接管,那么针对跨域使用的OPTIONS预检请求,就由代理服务接管,而不经过实际的后端服务
@@ -721,7 +739,7 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
721739
}
722740

723741
// 请求
724-
if ((boolean) getContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN)) {
742+
if ((boolean) getContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN)) {
725743
httpClient.request(requestOptions).onComplete(connectHandler(ctx, ctx.request(), ctx.response(), proxyUrl));
726744
}
727745
};

src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java

Lines changed: 42 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -46,64 +46,49 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
4646
this.connectHandler = sourceSocket -> {
4747
// 暂停流读取
4848
sourceSocket.pause();
49-
netClient.connect(targetPort, targetHost)
50-
.onFailure(e -> {
51-
log.error("failed to connect to {}:{}", targetHost, targetPort, e);
52-
sourceSocket.close();
53-
})
54-
.onSuccess(targetSocket -> {
55-
SocketAddress sourceRemoteAddress = sourceSocket.remoteAddress();
56-
SocketAddress sourceLocalAddress = sourceSocket.localAddress();
57-
SocketAddress targetRemoteAddress = targetSocket.remoteAddress();
58-
SocketAddress targetLocalAddress = targetSocket.localAddress();
59-
log.debug("connected {} -- {} ({} -- {})", sourceRemoteAddress.toString(), sourceLocalAddress.toString(),
60-
targetLocalAddress.toString(), targetRemoteAddress.toString());
61-
62-
// 暂停流读取
63-
targetSocket.pause();
64-
65-
66-
sourceSocket.closeHandler(v -> targetSocket.close()).pipeTo(targetSocket, ar -> {
67-
if (ar.succeeded()) {
68-
log.debug("pipeTo successful. {} --> {} --> {} --> {}",
69-
sourceRemoteAddress,
70-
sourceLocalAddress,
71-
targetLocalAddress,
72-
targetRemoteAddress);
73-
} else {
74-
log.error("pipeTo failed. {} --> {} --> {} --> {}",
75-
sourceRemoteAddress,
76-
sourceLocalAddress,
77-
targetLocalAddress,
78-
targetRemoteAddress,
79-
ar.cause());
80-
}
81-
});
82-
targetSocket.closeHandler(v -> {
83-
sourceSocket.close();
84-
log.debug("closed {} -- {} ({} -- {})", sourceRemoteAddress.toString(), sourceLocalAddress.toString(),
85-
targetLocalAddress.toString(), targetRemoteAddress.toString());
86-
}).pipeTo(sourceSocket, ar -> {
87-
if (ar.succeeded()) {
88-
log.debug("pipeTo successful. {} <-- {} <-- {} <-- {}",
89-
sourceRemoteAddress,
90-
sourceLocalAddress,
91-
targetLocalAddress,
92-
targetRemoteAddress);
93-
} else {
94-
log.error("pipeTo failed. {} <-- {} <-- {} <-- {}",
95-
sourceRemoteAddress,
96-
sourceLocalAddress,
97-
targetLocalAddress,
98-
targetRemoteAddress,
99-
ar.cause());
100-
}
101-
});
102-
103-
// 恢复流读取
104-
sourceSocket.resume();
105-
targetSocket.resume();
49+
SocketAddress sourceRemote = sourceSocket.remoteAddress();
50+
SocketAddress sourceLocal = sourceSocket.localAddress();
51+
log.debug("{} <-- {} connected", sourceLocal, sourceRemote);
52+
sourceSocket.closeHandler(v -> log.debug("{} <-- {} closed", sourceLocal, sourceRemote));
53+
netClient.connect(targetPort, targetHost).onComplete(ar -> {
54+
if (ar.succeeded()) {
55+
NetSocket targetSocket = ar.result();
56+
targetSocket.pause();
57+
SocketAddress targetRemote = targetSocket.remoteAddress();
58+
SocketAddress targetLocal = targetSocket.localAddress();
59+
log.debug("{} --> {} connected", targetLocal, targetRemote);
60+
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
61+
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
62+
targetSocket.closeHandler(v -> log.debug("{} --> {} closed", targetLocal, targetRemote));
63+
sourceSocket.pipeTo(targetSocket).onComplete(ar1 -> {
64+
if (ar1.succeeded()) {
65+
log.debug("pipeTo successful. {} --> {} --> {} --> {}",
66+
sourceRemote, sourceLocal, targetLocal, targetRemote);
67+
} else {
68+
log.error("pipeTo failed. {} --> {} --> {} --> {}",
69+
sourceRemote, sourceLocal, targetLocal, targetRemote,
70+
ar1.cause());
71+
}
10672
});
73+
targetSocket.pipeTo(sourceSocket).onComplete(ar1 -> {
74+
if (ar1.succeeded()) {
75+
log.debug("pipeTo successful. {} <-- {} <-- {} <-- {}",
76+
sourceRemote, sourceLocal, targetLocal, targetRemote);
77+
} else {
78+
log.error("pipeTo failed. {} <-- {} <-- {} <-- {}",
79+
sourceRemote, sourceLocal, targetLocal, targetRemote,
80+
ar1.cause());
81+
}
82+
});
83+
sourceSocket.resume();
84+
targetSocket.resume();
85+
86+
} else {
87+
log.error("failed to connect to {}:{}", targetHost, targetPort, ar.cause());
88+
// 若连接目标服务失败,需要断开源头服务
89+
sourceSocket.close();
90+
}
91+
});
10792
};
10893
}
10994

0 commit comments

Comments
 (0)