Skip to content

Commit 8f2a631

Browse files
committed
feat: 1. TunnelServer接收到未知消息或者未注册处理Handler的消息时,主动将连接关闭 2. 重新调整消息事件TunnelMessageType
(cherry picked from commit 2e7730121f7d4656509b46076e26ce96cc5be63b)
1 parent 9df1714 commit 8f2a631

14 files changed

+445
-992
lines changed

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,20 @@ public class ReverseTcpProxyTunnelClient extends TunnelClient {
3131
protected static final long HEARTBEAT_DELAY_DEFAULT = 5000;// 毫秒
3232
protected static final long MIN_DELAY_DEFAULT = 1000;// 毫秒
3333
protected static final long MAX_DELAY_DEFAULT = 64000;// 毫秒
34-
protected static final String TOKEN_DEFAULT = "123456789";
34+
protected static final String SECRET_DEFAULT = "123456789";
3535
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
3636

3737

3838
protected final long heartbeatDelay;
39-
protected final String token;
39+
protected final String secret;
4040
protected final String name;
4141

4242

43+
protected String localHost = "127.0.0.1";
44+
protected int localPort = 2222;
45+
protected int remotePort = 2222;
46+
47+
4348
protected static String generateName() {
4449
final String prefix = "ReverseTcpProxyTunnelClient-";
4550
try {
@@ -61,76 +66,88 @@ protected static String generateName() {
6166

6267
protected ReverseTcpProxyTunnelClient(Vertx vertx, NetClient netClient,
6368
long minDelay, long maxDelay, long heartbeatDelay,
64-
String token, String name) {
69+
String secret, String name) {
6570
super(vertx, netClient, minDelay, maxDelay);
6671
this.heartbeatDelay = heartbeatDelay;
67-
this.token = token;
72+
this.secret = secret;
6873
this.name = name;
6974
addMessageHandler();
7075
}
7176

77+
public ReverseTcpProxyTunnelClient localHost(String localHost) {
78+
this.localHost = localHost;
79+
return this;
80+
}
81+
82+
public ReverseTcpProxyTunnelClient localPort(int localPort) {
83+
this.localPort = localPort;
84+
return this;
85+
}
86+
87+
public ReverseTcpProxyTunnelClient remotePort(int remotePort) {
88+
this.remotePort = remotePort;
89+
return this;
90+
}
91+
7292
/**
7393
* 注册内网穿透的监听逻辑
7494
*/
7595
protected void addMessageHandler() {
7696
// 监听连接成功事件
77-
this.onConnected((vertx, netSocket, buffer) -> emit(TunnelMessageType.AUTH, TunnelMessage.Auth
78-
.newBuilder()
79-
.setToken(token)
80-
.build()
81-
.toByteArray()));
82-
// 监听授权响应事件
83-
this.on(TunnelMessageType.AUTH_ACK, new AbstractTunnelHandler() {
97+
this.onConnected((vertx, netSocket, buffer) -> netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT,
98+
TunnelMessage.OpenDataPort.newBuilder()
99+
.setSecret(secret)
100+
.setPort(remotePort)
101+
.build().toByteArray())));
102+
103+
// 监听授权与开通数据端口事件
104+
this.on(TunnelMessageType.OPEN_DATA_PORT_ACK, new AbstractTunnelHandler() {
84105
@Override
85106
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
86107
boolean result = false;
87108
try {
88-
TunnelMessage.AuthAck ack = TunnelMessage.AuthAck.parseFrom(bodyBytes);
89-
result = ack.getSuccess();
90-
if (result) {
91-
vertx.setTimer(heartbeatDelay, id -> emit(TunnelMessageType.HEARTBEAT,
92-
TunnelMessage.Heartbeat.newBuilder()
93-
.setTimestamp(System.currentTimeMillis())
94-
.build()
95-
.toByteArray()));
109+
TunnelMessage.OpenDataPortAck parsed = TunnelMessage.OpenDataPortAck.parseFrom(bodyBytes);
110+
if (parsed.getSuccess()) {
111+
// 如果认证 + 开通端口成功,那么就需要进行长连接保持,并开启定期心跳。
112+
vertx.setTimer(heartbeatDelay, id -> netSocket.write(encode(TunnelMessageType.HEARTBEAT,
113+
TunnelMessage.Heartbeat.newBuilder().setTimestamp(System.currentTimeMillis()).build().toByteArray())));
114+
} else {
115+
// 如果认证失败,服务端会主动关闭 tcp 连接
116+
log.warn("{} error : {}", TunnelMessageType.OPEN_DATA_PORT_ACK, parsed.getMessage());
96117
}
97118
} catch (Exception e) {
98119
}
99120
return result;
100121
}
101122
});
102-
// 监听心跳响应事件
123+
124+
// 监听心跳事件
103125
this.on(TunnelMessageType.HEARTBEAT_ACK, new AbstractTunnelHandler() {
104126
@Override
105127
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
106-
try {
107-
vertx.setTimer(heartbeatDelay, id -> emit(TunnelMessageType.HEARTBEAT,
108-
TunnelMessage.Heartbeat.newBuilder()
109-
.setTimestamp(System.currentTimeMillis())
110-
.build()
111-
.toByteArray()));
112-
} catch (Exception e) {
113-
}
128+
// 只要收到心跳,就证明没问题。不用去解析内容。直接开启下一波心跳计划即可。
129+
vertx.setTimer(heartbeatDelay, id -> netSocket.write(encode(TunnelMessageType.HEARTBEAT,
130+
TunnelMessage.Heartbeat.newBuilder().setTimestamp(System.currentTimeMillis()).build().toByteArray())));
114131
return true;
115132
}
116133
});
117134
}
118135

119-
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, long minDelay, long maxDelay, long heartbeatDelay, String token, String name) {
120-
return new ReverseTcpProxyTunnelClient(vertx, netClient, minDelay, maxDelay, heartbeatDelay, token, name);
136+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, long minDelay, long maxDelay, long heartbeatDelay, String secret, String name) {
137+
return new ReverseTcpProxyTunnelClient(vertx, netClient, minDelay, maxDelay, heartbeatDelay, secret, name);
121138
}
122139

123-
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, String token) {
124-
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, token, generateName());
140+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, String secret) {
141+
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, secret, generateName());
125142
}
126143

127144

128145
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient) {
129-
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, TOKEN_DEFAULT, generateName());
146+
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, SECRET_DEFAULT, generateName());
130147
}
131148

132149
public static ReverseTcpProxyTunnelClient create(Vertx vertx) {
133-
return new ReverseTcpProxyTunnelClient(vertx, vertx.createNetClient(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, TOKEN_DEFAULT, generateName());
150+
return new ReverseTcpProxyTunnelClient(vertx, vertx.createNetClient(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, SECRET_DEFAULT, generateName());
134151
}
135152

136153

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,20 @@ public class ReverseTcpProxyTunnelServer extends TunnelServer {
3333

3434
private static final Logger log = LoggerFactory.getLogger(ReverseTcpProxyTunnelServer.class);
3535
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
36-
protected static final String TOKEN_DEFAULT = "123456789";
36+
protected static final String SECRECT_TOKEN = "123456789";
3737

3838

3939
protected String host = "0.0.0.0";
4040
protected int port = 44444;
4141
protected Set<NetSocket> authedSockets = new HashSet<>(); // 授权成功的socket列表
4242

43-
protected final String token;
43+
protected final String secret;
4444
protected final String name;
4545
protected final Handler<NetSocket> connectHandler;
4646

47-
public ReverseTcpProxyTunnelServer(Vertx vertx, NetServer netServer, String token, String name) {
47+
public ReverseTcpProxyTunnelServer(Vertx vertx, NetServer netServer, String secret, String name) {
4848
super(vertx, netServer);
49-
this.token = token;
49+
this.secret = secret;
5050
this.name = name;
5151
this.connectHandler = socket -> {
5252
socket.pause();
@@ -71,79 +71,46 @@ protected void addMessageHandler() {
7171
// 监听连接成功事件
7272
this.onConnected((vertx1, netSocket, buffer) -> log.debug("{} connected", netSocket.remoteAddress()));
7373

74-
// 监听授权消息
75-
this.on(TunnelMessageType.AUTH, new AbstractTunnelHandler() {
74+
// 监听授权与开通端口事件
75+
this.on(TunnelMessageType.OPEN_DATA_PORT, new AbstractTunnelHandler() {
7676
@Override
7777
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
78+
// 如果授权通过,并且成功开通端口。则返回成功;否则则返回失败,并关闭连接
7879
boolean result = false;
7980
try {
80-
TunnelMessage.Auth parsed = TunnelMessage.Auth.parseFrom(bodyBytes);
81-
if (token.equals(parsed.getToken())) {
82-
result = true;
83-
netSocket.write(encode(TunnelMessageType.AUTH_ACK,
84-
TunnelMessage.AuthAck.newBuilder()
85-
.setSuccess(result)
86-
.setMessage("success")
87-
.build().toByteArray()))
88-
.onComplete(ar -> {
89-
authedSockets.add(netSocket);
90-
});
81+
TunnelMessage.OpenDataPort parsed = TunnelMessage.OpenDataPort.parseFrom(bodyBytes);
82+
if (secret.equals(parsed.getSecret())) {
83+
final int tPort = parsed.getPort();
84+
9185
} else {
92-
netSocket.write(encode(TunnelMessageType.AUTH_ACK,
93-
TunnelMessage.AuthAck.newBuilder()
94-
.setSuccess(result)
95-
.setMessage("failure")
96-
.build().toByteArray()))
97-
.onComplete(ar -> {
98-
// 鉴权失败,服务端主动关闭连接
99-
netSocket.close();
100-
});
86+
netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT_ACK,
87+
TunnelMessage.OpenDataPortAck
88+
.newBuilder()
89+
.setSuccess(result)
90+
.setMessage("your secret is incorrect!")
91+
.build().toByteArray())).onComplete(ar -> netSocket.close());
10192
}
10293
} catch (Exception e) {
10394
}
10495
return result;
10596
}
10697
});
107-
108-
// 监听心跳
109-
this.on(TunnelMessageType.HEARTBEAT, new AbstractTunnelHandler() {
110-
@Override
111-
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
112-
if (authedSockets.contains(netSocket)) {
113-
netSocket.write(encode(TunnelMessageType.HEARTBEAT_ACK,
114-
TunnelMessage.HeartbeatAck.newBuilder().setTimestamp(System.currentTimeMillis()).build().toByteArray()));
115-
return true;
116-
} else {
117-
// 未经授权的连接,直接关闭
118-
netSocket.close();
119-
return false;
120-
}
121-
}
122-
});
123-
124-
// 监听开通端口请求
125-
this.on(TunnelMessageType.OPEN_PORT, new AbstractTunnelHandler() {
126-
@Override
127-
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
128-
return false;
129-
}
130-
});
13198
}
13299

133-
public static ReverseTcpProxyTunnelServer create(Vertx vertx, NetServer netServer, String token, String name) {
134-
return new ReverseTcpProxyTunnelServer(vertx, netServer, token, name);
100+
public static ReverseTcpProxyTunnelServer create(Vertx vertx, NetServer netServer, String secret, String name) {
101+
return new ReverseTcpProxyTunnelServer(vertx, netServer, secret, name);
135102
}
136103

137-
public static ReverseTcpProxyTunnelServer create(Vertx vertx, NetServer netServer, String token) {
138-
return new ReverseTcpProxyTunnelServer(vertx, netServer, token, generateName());
104+
public static ReverseTcpProxyTunnelServer create(Vertx vertx, NetServer netServer, String secret) {
105+
return new ReverseTcpProxyTunnelServer(vertx, netServer, secret, generateName());
139106
}
140107

141108
public static ReverseTcpProxyTunnelServer create(Vertx vertx, NetServer netServer) {
142-
return new ReverseTcpProxyTunnelServer(vertx, netServer, TOKEN_DEFAULT, generateName());
109+
return new ReverseTcpProxyTunnelServer(vertx, netServer, SECRECT_TOKEN, generateName());
143110
}
144111

145112
public static ReverseTcpProxyTunnelServer create(Vertx vertx) {
146-
return new ReverseTcpProxyTunnelServer(vertx, vertx.createNetServer(), TOKEN_DEFAULT, generateName());
113+
return new ReverseTcpProxyTunnelServer(vertx, vertx.createNetServer(), SECRECT_TOKEN, generateName());
147114
}
148115

149116

src/main/java/top/meethigher/proxy/tcp/tunnel/Tunnel.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package top.meethigher.proxy.tcp.tunnel;
22

3+
import io.vertx.core.Vertx;
34
import io.vertx.core.buffer.Buffer;
45
import io.vertx.core.net.NetSocket;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
58
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageCodec;
69
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageParser;
710
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageType;
@@ -19,6 +22,13 @@
1922
*/
2023
public abstract class Tunnel {
2124

25+
private static final Logger log = LoggerFactory.getLogger(Tunnel.class);
26+
protected final Vertx vertx;
27+
28+
protected Tunnel(Vertx vertx) {
29+
this.vertx = vertx;
30+
}
31+
2232
/**
2333
* 监听消息类型 {@code TunnelMessageType} 以及触发的动作 {@code TunnelHandler}
2434
*/
@@ -27,16 +37,55 @@ public abstract class Tunnel {
2737
/**
2838
* 监听socket连接成功触发的动作
2939
*/
30-
public abstract void onConnected(TunnelHandler tunnelHandler);
40+
public void onConnected(TunnelHandler tunnelHandler) {
41+
tunnelHandlers.put(null, tunnelHandler);
42+
}
3143

3244
/**
3345
* 监听 {@code TunnelMessageType} 事件
3446
*/
35-
public abstract void on(TunnelMessageType type, TunnelHandler tunnelHandler);
47+
public void on(TunnelMessageType type, TunnelHandler tunnelHandler) {
48+
tunnelHandlers.put(type, tunnelHandler);
49+
}
3650

51+
/**
52+
* 将请求体按照{@code TunnelMessageCodec} 规范进行编码
53+
*
54+
* @param type 消息类型
55+
* @param bodyBytes 请求体
56+
* @return 编码结果
57+
*/
3758
public Buffer encode(TunnelMessageType type, byte[] bodyBytes) {
3859
return TunnelMessageCodec.encode(type.code(), bodyBytes);
3960
}
4061

41-
public abstract TunnelMessageParser decode(NetSocket socket);
62+
/**
63+
* 将请求体按照 {@code TunnelMessageCodec } 进行解码
64+
* <p>
65+
* 若传输过来的数据,不符合要求,就会将连接关闭,该操作适合 Server 端的安全防护,Client 端完全没必要这么严格。因此,Client 如果使用该方法的话,建议重写
66+
*
67+
* @param socket 连接
68+
* @return 解码器
69+
*/
70+
public TunnelMessageParser decode(final NetSocket socket) {
71+
return new TunnelMessageParser(buffer -> {
72+
TunnelMessageCodec.DecodedMessage decodedMessage = TunnelMessageCodec.decode(buffer);
73+
TunnelMessageType type = TunnelMessageType.fromCode(decodedMessage.type);
74+
for (TunnelMessageType tunnelMessageType : tunnelHandlers.keySet()) {
75+
if (type == tunnelMessageType) {
76+
TunnelHandler tunnelHandler = tunnelHandlers.get(tunnelMessageType);
77+
if (tunnelHandler != null) {
78+
tunnelHandler.handle(vertx, socket, buffer);
79+
} else {
80+
log.debug("no tunnel handler for {}, connection {} will be closed", type, socket.remoteAddress());
81+
socket.close();
82+
}
83+
return;
84+
}
85+
}
86+
log.debug("no tunnel handler for {}, connection {} will be closed", type, socket.remoteAddress());
87+
socket.close();
88+
return;
89+
}, socket);
90+
}
4291
}

src/main/java/top/meethigher/proxy/tcp/tunnel/TunnelClient.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ public abstract class TunnelClient extends Tunnel {
2727
*/
2828
protected long reconnectDelay;
2929

30-
protected final Vertx vertx;
31-
3230

3331
protected final NetClient netClient;
3432

@@ -51,24 +49,13 @@ public abstract class TunnelClient extends Tunnel {
5149

5250

5351
protected TunnelClient(Vertx vertx, NetClient netClient, long minDelay, long maxDelay) {
54-
this.vertx = vertx;
52+
super(vertx);
5553
this.netClient = netClient;
5654
this.minDelay = minDelay;
5755
this.maxDelay = maxDelay;
5856
setReconnectDelay(this.minDelay);
5957
}
6058

61-
62-
@Override
63-
public void onConnected(TunnelHandler tunnelHandler) {
64-
tunnelHandlers.put(null, tunnelHandler);
65-
}
66-
67-
@Override
68-
public void on(TunnelMessageType type, TunnelHandler tunnelHandler) {
69-
tunnelHandlers.put(type, tunnelHandler);
70-
}
71-
7259
@Override
7360
public TunnelMessageParser decode(final NetSocket socket) {
7461
return new TunnelMessageParser(buffer -> {
@@ -80,6 +67,7 @@ public TunnelMessageParser decode(final NetSocket socket) {
8067
if (tunnelHandler != null) {
8168
tunnelHandler.handle(vertx, socket, buffer);
8269
}
70+
return;
8371
}
8472
}
8573
}, socket);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package top.meethigher.proxy.tcp.tunnel;
2+
3+
public abstract class TunnelDataServer {
4+
}

0 commit comments

Comments
 (0)