Skip to content

Commit 91d713a

Browse files
authored
Merge pull request #21 from meethigher/issue-18
在pipe前添加针对连接的异常处理器
2 parents 0d94d64 + 90a3d63 commit 91d713a

File tree

6 files changed

+121
-59
lines changed

6 files changed

+121
-59
lines changed

src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package top.meethigher.proxy.http;
22

33
import io.vertx.core.AsyncResult;
4-
import io.vertx.core.Future;
54
import io.vertx.core.Handler;
65
import io.vertx.core.Vertx;
76
import io.vertx.core.http.*;
@@ -339,18 +338,11 @@ public ReverseHttpProxy host(String host) {
339338
}
340339

341340
public void start() {
342-
httpServer.requestHandler(router).exceptionHandler(e -> log.error("request failed", e));
343-
Future<HttpServer> listenFuture = httpServer.listen(sourcePort, sourceHost);
344-
345-
Handler<AsyncResult<HttpServer>> asyncResultHandler = ar -> {
346-
if (ar.succeeded()) {
347-
log.info("{} started on {}:{}", name, sourceHost, sourcePort);
348-
} else {
349-
log.error("{} start failed", name, ar.cause());
350-
351-
}
352-
};
353-
listenFuture.onComplete(asyncResultHandler);
341+
httpServer.requestHandler(router)
342+
.exceptionHandler(e -> log.error("{} socket errors happening before the HTTP connection", name, e))
343+
.listen(sourcePort, sourceHost)
344+
.onFailure(e -> log.error("{} start failed", name, e))
345+
.onSuccess(v -> log.info("{} started on {}:{}", name, sourceHost, sourcePort));
354346
}
355347

356348
public void stop() {
@@ -635,10 +627,14 @@ protected Handler<AsyncResult<HttpClientRequest>> connectHandler(RoutingContext
635627
setContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR, connection.remoteAddress().toString());
636628
log.debug("target {} -- {} connected", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
637629

638-
connection.closeHandler(v -> {
639-
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, false);
640-
log.debug("target {} -- {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
641-
});
630+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
631+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
632+
connection.exceptionHandler(e ->
633+
log.error("target {} -- {} exception occurred", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR), e))
634+
.closeHandler(v -> {
635+
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, false);
636+
log.debug("target {} -- {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
637+
});
642638

643639

644640
// 复制请求头。复制的过程中忽略逐跳标头
@@ -675,15 +671,21 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
675671
return ctx -> {
676672
// 暂停流读取
677673
ctx.request().pause();
678-
679674
HttpConnection connection = ctx.request().connection();
680675
setContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR, connection.remoteAddress().toString());
681676
setContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR, connection.localAddress().toString());
677+
log.debug("source {} -- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
678+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
679+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
680+
connection.exceptionHandler(e -> log.error("source {} -- {} exception occurred", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR), e))
681+
.closeHandler(v -> {
682+
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, false);
683+
log.debug("source {} -- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
684+
});
682685
// 记录请求开始时间
683686
setContextData(ctx, INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis());
684687
// 记录连接状态
685688
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, true);
686-
log.debug("source {} -- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
687689

688690
// vertx的uri()是包含query参数的。而path()才是我们常说的不带有query的uri
689691
// route不是线程安全的。route里的metadata应以路由为单元存储,而不是以请求为单元存储。一个路由会有很多请求。
@@ -708,10 +710,6 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
708710
requestOptions.setMethod(ctx.request().method());
709711
requestOptions.setFollowRedirects(getContextData(ctx, P_FOLLOW_REDIRECTS) != null && Boolean.parseBoolean(getContextData(ctx, P_FOLLOW_REDIRECTS).toString()));
710712

711-
connection.closeHandler(v -> {
712-
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, false);
713-
log.debug("source {} -- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
714-
});
715713

716714
// 如果跨域由代理服务接管,那么针对跨域使用的OPTIONS预检请求,就由代理服务接管,而不经过实际的后端服务
717715
if (HttpMethod.OPTIONS.name().equalsIgnoreCase(ctx.request().method().name()) &&

src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package top.meethigher.proxy.tcp;
22

3-
import io.vertx.core.AsyncResult;
4-
import io.vertx.core.Future;
53
import io.vertx.core.Handler;
64
import io.vertx.core.Vertx;
75
import io.vertx.core.net.NetClient;
@@ -54,7 +52,10 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
5452
sourceSocket.pause();
5553
SocketAddress sourceRemote = sourceSocket.remoteAddress();
5654
SocketAddress sourceLocal = sourceSocket.localAddress();
57-
sourceSocket.closeHandler(v -> log.debug("source {} -- {} closed", sourceLocal, sourceRemote));
55+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
56+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
57+
sourceSocket.exceptionHandler(e -> log.error("source {} -- {} exception occurred", sourceLocal, sourceRemote, e))
58+
.closeHandler(v -> log.debug("source {} -- {} closed", sourceLocal, sourceRemote));
5859
NetAddress next = lb.next();
5960
String targetHost = next.getHost();
6061
int targetPort = next.getPort();
@@ -64,7 +65,7 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
6465
);
6566
netClient.connect(targetPort, targetHost)
6667
.onFailure(e -> {
67-
log.error("failed to connect to {}:{}", targetHost, targetPort, e);
68+
log.error("source {} -- {} failed to connect to {}:{}", sourceLocal, sourceRemote, targetHost, targetPort, e);
6869
// 若连接目标服务失败,需要断开源头服务
6970
sourceSocket.close();
7071
})
@@ -76,7 +77,10 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
7677

7778
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
7879
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
79-
targetSocket.closeHandler(v -> log.debug("target {} -- {} closed", targetLocal, targetRemote));
80+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
81+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
82+
targetSocket.exceptionHandler(e -> log.error("target {} -- {} exception occurred", targetLocal, targetRemote, e))
83+
.closeHandler(v -> log.debug("target {} -- {} closed", targetLocal, targetRemote));
8084

8185
// https://github.com/meethigher/tcp-reverse-proxy/issues/12
8286
// 将日志记录详细,便于排查问题
@@ -193,18 +197,11 @@ public void start() {
193197
if (netAddresses.size() <= 0) {
194198
throw new IllegalStateException("netAddresses size must be greater than 0");
195199
}
196-
netServer.connectHandler(connectHandler).exceptionHandler(e -> log.error("connect failed", e));
197-
Future<NetServer> listenFuture = netServer.listen(sourcePort, sourceHost);
198-
199-
Handler<AsyncResult<NetServer>> asyncResultHandler = ar -> {
200-
if (ar.succeeded()) {
201-
log.info("{} started on {}:{}\nLB-Mode: {}\n {}", name, sourceHost, sourcePort, lb.name(), netAddresses);
202-
} else {
203-
Throwable e = ar.cause();
204-
log.error("{} start failed", name, e);
205-
}
206-
};
207-
listenFuture.onComplete(asyncResultHandler);
200+
netServer.connectHandler(connectHandler)
201+
.exceptionHandler(e -> log.error("{} socket errors happening before the connection is passed to the connectHandler", name, e))
202+
.listen(sourcePort, sourceHost)
203+
.onFailure(e -> log.error("{} start failed", name, e))
204+
.onSuccess(v -> log.info("{} started on {}:{}\nLB-Mode: {}\n {}", name, sourceHost, sourcePort, lb.name(), netAddresses));
208205
}
209206

210207
public void stop() {

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,10 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
247247
// 双向生命周期绑定、双向数据转发
248248
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
249249
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
250-
dataSocket.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress()))
250+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
251+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
252+
dataSocket.exceptionHandler(e -> log.error("{}: sessionId {}, data connection {} -- {} exception occurred", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress(), e))
253+
.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress()))
251254
.pipeTo(backendSocket)
252255
.onFailure(e -> log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed",
253256
dataProxyName,
@@ -260,7 +263,10 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
260263
sessionId,
261264
dataSocket.remoteAddress(), dataSocket.localAddress(),
262265
backendSocket.remoteAddress(), backendSocket.localAddress()));
263-
backendSocket.closeHandler(v -> log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress()))
266+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
267+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
268+
backendSocket.exceptionHandler(e -> log.error("{}: sessionId {}, backend connection {} -- {} exception occurred", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress(), e))
269+
.closeHandler(v -> log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress()))
264270
.pipeTo(dataSocket)
265271
.onFailure(e -> log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed",
266272
dataProxyName,

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package top.meethigher.proxy.tcp.tunnel;
22

33

4-
import io.vertx.core.AsyncResult;
5-
import io.vertx.core.Handler;
64
import io.vertx.core.Vertx;
75
import io.vertx.core.buffer.Buffer;
86
import io.vertx.core.net.NetServer;
@@ -145,16 +143,11 @@ public static String generateName() {
145143
}
146144

147145
public void start() {
148-
Handler<AsyncResult<NetServer>> asyncResultHandler = ar -> {
149-
if (ar.succeeded()) {
150-
log.info("{} started on {}:{}", name, host, port);
151-
} else {
152-
Throwable e = ar.cause();
153-
log.error("{} start failed", name, e);
154-
}
155-
};
156-
netServer.connectHandler(this::handleConnect).exceptionHandler(e -> log.error("connect failed", e));
157-
netServer.listen(port, host).onComplete(asyncResultHandler);
146+
netServer.connectHandler(this::handleConnect)
147+
.exceptionHandler(e -> log.error("{}: socket errors happening before the connection is passed to the connectHandler", name, e))
148+
.listen(port, host)
149+
.onFailure(e -> log.error("{} start failed", name, e))
150+
.onSuccess(v -> log.info("{} started on {}:{}", name, host, port));
158151
}
159152

160153
public void stop() {
@@ -301,7 +294,10 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in
301294
// 双向生命周期绑定、双向数据转发
302295
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
303296
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
304-
userSocket.closeHandler(v -> log.debug("{}: sessionId {}, user connection {} -- {} closed", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress()))
297+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
298+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
299+
userSocket.exceptionHandler(e -> log.error("{}: sessionId {}, user connection {} -- {} exception occurred", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress(), e))
300+
.closeHandler(v -> log.debug("{}: sessionId {}, user connection {} -- {} closed", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress()))
305301
.pipeTo(dataSocket)
306302
.onFailure(e -> log.error("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} failed",
307303
name,
@@ -311,10 +307,10 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in
311307
name,
312308
sessionId,
313309
userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress()));
314-
dataSocket.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed",
315-
name,
316-
sessionId,
317-
dataSocket.remoteAddress(), dataSocket.localAddress()))
310+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
311+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
312+
dataSocket.exceptionHandler(e -> log.error("{}: sessionId {}, data connection {} -- {} exception occurred", name, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress(), e))
313+
.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed", name, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress()))
318314
.pipeTo(userSocket)
319315
.onFailure(e -> log.error("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} failed",
320316
name,
@@ -347,6 +343,7 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in
347343
public void start() {
348344
this.netServer
349345
.connectHandler(this::handleConnect)
346+
.exceptionHandler(e -> log.error("{} socket errors happening before the connection is passed to the connectHandler", name, e))
350347
.listen(port, host)
351348
.onComplete(ar -> {
352349
if (ar.succeeded()) {
@@ -369,6 +366,7 @@ public boolean startSync() {
369366
AtomicBoolean success = new AtomicBoolean(false);
370367
this.netServer
371368
.connectHandler(this::handleConnect)
369+
.exceptionHandler(e -> log.error("{} socket errors happening before the connection is passed to the connectHandler", name, e))
372370
.listen(port, host)
373371
.onComplete(ar -> {
374372
if (ar.succeeded()) {

0 commit comments

Comments
 (0)