7
7
import org .slf4j .Logger ;
8
8
import org .slf4j .LoggerFactory ;
9
9
import top .meethigher .proxy .NetAddress ;
10
+ import top .meethigher .proxy .tcp .mux .model .MuxConfiguration ;
10
11
11
12
import java .util .concurrent .ThreadLocalRandom ;
12
13
@@ -38,11 +39,6 @@ protected ReverseTcpProxyMuxServer(Vertx vertx, String secret, NetServer netServ
38
39
39
40
protected void handleConnect (NetSocket src ) {
40
41
src .pause ();
41
- log .debug ("source {} -- {} connected" , src .localAddress (), src .remoteAddress ());
42
- // 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
43
- // 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
44
- src .exceptionHandler (e -> log .error ("source {} -- {} exception occurred" , src .localAddress (), src .remoteAddress (), e ))
45
- .closeHandler (v -> log .debug ("source {} -- {} closed" , src .localAddress (), src .remoteAddress ()));
46
42
src .handler (new MuxMessageParser (muxMsg -> this .bindMuxConnections (src , muxMsg ), src ));
47
43
src .resume ();
48
44
}
@@ -52,26 +48,32 @@ protected void handleConnect(NetSocket src) {
52
48
*/
53
49
protected void bindMuxConnections (NetSocket src , MuxMessageParser .MuxMessage muxMsg ) {
54
50
src .pause ();
55
- NetAddress backend = aesBase64Decode (muxMsg .backendServerBuf );
56
- if (backend == null ) {
51
+ MuxConfiguration cfg = aesBase64Decode (muxMsg .backendServerBuf );
52
+ if (cfg == null ) {
57
53
log .warn ("source {} -- {} exception occurred: failed to parsing the backendServer address from encrypted content:{}" ,
58
54
src .localAddress (), src .remoteAddress (),
59
55
muxMsg .backendServerBuf );
60
56
src .close ();
61
57
return ;
62
58
}
59
+ NetAddress backend = cfg .backendServer ;
60
+ log .debug ("{}: sessionId {}, source {} -- {} connected" , cfg .name , cfg .sessionId , src .localAddress (), src .remoteAddress ());
61
+ // 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
62
+ // 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
63
+ src .exceptionHandler (e -> log .error ("{}: sessionId {}, source {} -- {} exception occurred" , cfg .name , cfg .sessionId , src .localAddress (), src .remoteAddress (), e ))
64
+ .closeHandler (v -> log .debug ("{}: sessionId {}, source {} -- {} closed" , cfg .name , cfg .sessionId , src .localAddress (), src .remoteAddress ()));
63
65
netClient .connect (backend .getPort (), backend .getHost ())
64
66
.onFailure (e -> {
65
- log .error ("source {} -- {} failed to connect to {}" , src .localAddress (), src .remoteAddress (), backend , e );
67
+ log .error ("{}: sessionId {}, source {} -- {} failed to connect to {}" , cfg . name , cfg . sessionId , src .localAddress (), src .remoteAddress (), backend , e );
66
68
src .close ();
67
69
})
68
70
.onSuccess (dst -> {
69
71
dst .pause ();
70
- log .debug ("target {} -- {} connected" , dst .localAddress (), dst .remoteAddress ());
72
+ log .debug ("{}: sessionId {}, target {} -- {} connected" , cfg . name , cfg . sessionId , dst .localAddress (), dst .remoteAddress ());
71
73
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
72
74
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
73
- dst .exceptionHandler (e -> log .error ("target {} -- {} exception occurred" , dst .localAddress (), dst .remoteAddress (), e ))
74
- .closeHandler (v -> log .debug ("target {} -- {} closed" , dst .localAddress (), dst .remoteAddress ()));
75
+ dst .exceptionHandler (e -> log .error ("{}: sessionId {}, target {} -- {} exception occurred" , cfg . name , cfg . sessionId , dst .localAddress (), dst .remoteAddress (), e ))
76
+ .closeHandler (v -> log .debug ("{}: sessionId {}, target {} -- {} closed" , cfg . name , cfg . sessionId , dst .localAddress (), dst .remoteAddress ()));
75
77
/**
76
78
* 不能使用write的成功与否判断链路是否正常,但是可以通过write.onSuccess保证顺序写入。
77
79
* 测试中发现,即便链路异常,返回仍然是true 参考 https://github.com/meethigher/bug-test/blob/vertx-network-disconnect/src/main/java/top/meethigher/BugTest.java
@@ -83,16 +85,18 @@ protected void bindMuxConnections(NetSocket src, MuxMessageParser.MuxMessage mux
83
85
// https://github.com/meethigher/tcp-reverse-proxy/issues/12
84
86
// 将日志记录详细,便于排查问题
85
87
src .pipeTo (dst )
86
- .onSuccess (v -> log .debug ("source {} -- {} pipe to target {} -- {} succeeded" ,
87
- src .localAddress (), src .remoteAddress (), dst .localAddress (), dst .remoteAddress ()))
88
- .onFailure (e -> log .error ("source {} -- {} pipe to target {} -- {} failed" ,
89
- src .localAddress (), src .remoteAddress (), dst .localAddress (), dst .remoteAddress (), e ));
88
+ .onSuccess (v -> log .debug ("{}: sessionId {}, source {} -- {} pipe to target {} -- {} succeeded" ,
89
+ cfg . name , cfg . sessionId , src .localAddress (), src .remoteAddress (), dst .localAddress (), dst .remoteAddress ()))
90
+ .onFailure (e -> log .error ("{}: sessionId {}, source {} -- {} pipe to target {} -- {} failed" ,
91
+ cfg . name , cfg . sessionId , src .localAddress (), src .remoteAddress (), dst .localAddress (), dst .remoteAddress (), e ));
90
92
dst .pipeTo (src )
91
- .onSuccess (v -> log .debug ("target {} -- {} pipe to source {} -- {} succeeded" ,
92
- dst .localAddress (), dst .remoteAddress (), src .localAddress (), src .remoteAddress ()))
93
- .onFailure (e -> log .error ("target {} -- {} pipe to source {} -- {} failed" ,
94
- dst .localAddress (), dst .remoteAddress (), src .localAddress (), src .remoteAddress (), e ));
95
- log .debug ("source {} -- {} bound to target {} -- {}" , src .localAddress (), src .remoteAddress (),
93
+ .onSuccess (v -> log .debug ("{}: sessionId {}, target {} -- {} pipe to source {} -- {} succeeded" ,
94
+ cfg .name , cfg .sessionId , dst .localAddress (), dst .remoteAddress (), src .localAddress (), src .remoteAddress ()))
95
+ .onFailure (e -> log .error ("{}: sessionId {}, target {} -- {} pipe to source {} -- {} failed" ,
96
+ cfg .name , cfg .sessionId , dst .localAddress (), dst .remoteAddress (), src .localAddress (), src .remoteAddress (), e ));
97
+ log .debug ("{}: sessionId {}, source {} -- {} bound to target {} -- {}" ,
98
+ cfg .name , cfg .sessionId ,
99
+ src .localAddress (), src .remoteAddress (),
96
100
dst .localAddress (), dst .remoteAddress ());
97
101
src .resume ();
98
102
dst .resume ();
0 commit comments