Skip to content

Commit 2e5f45f

Browse files
committed
feat: 完成tcpmuxserver的解码逻辑、并添加相应的单元测试
1 parent 6c2a8b5 commit 2e5f45f

File tree

4 files changed

+230
-6
lines changed

4 files changed

+230
-6
lines changed

src/main/java/top/meethigher/proxy/tcp/mux/Mux.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ public abstract class Mux {
2727
/**
2828
* 默认对称加密密钥
2929
*/
30-
protected static final String SECRET_DEFAULT = "1234567890abcdef";
30+
public static final String SECRET_DEFAULT = "1234567890abcdef";
31+
32+
public static final short type = 5209;
3133

3234
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
3335

@@ -49,7 +51,7 @@ public Buffer aesBase64Encode(NetAddress netAddress) {
4951
String addr = netAddress.toString();
5052
SecretKey key = restoreKey(secret.getBytes(StandardCharsets.UTF_8));
5153
String encryptedAddr = encryptToBase64(addr.getBytes(StandardCharsets.UTF_8), key);
52-
return TunnelMessageCodec.encode((short) 0, encryptedAddr.getBytes(StandardCharsets.UTF_8));
54+
return TunnelMessageCodec.encode(type, encryptedAddr.getBytes(StandardCharsets.UTF_8));
5355
}
5456

5557
/**
@@ -64,4 +66,5 @@ public NetAddress aesBase64Decode(Buffer buffer) {
6466
return NetAddress.parse(addr);
6567
}
6668

69+
6770
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package top.meethigher.proxy.tcp.mux;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.buffer.Buffer;
5+
import io.vertx.core.net.NetSocket;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageParser;
9+
10+
/**
11+
* 适用于TcpMux的通信消息解析器
12+
* 只解析第一条数据
13+
*
14+
* @author <a href="https://meethigher.top">chenchuancheng</a>
15+
* @since 2025/07/27 02:19
16+
*/
17+
public class MuxMessageParser extends TunnelMessageParser {
18+
19+
public static final class MuxMessage {
20+
// muxclient发送给muxserver的backendServer配置信息
21+
public final Buffer backendServerBuf;
22+
// user发给muxclient的信息
23+
public final Buffer payload;
24+
25+
public MuxMessage(Buffer backendServerBuf, Buffer payload) {
26+
this.backendServerBuf = backendServerBuf;
27+
this.payload = payload;
28+
}
29+
}
30+
31+
32+
private static final Logger log = LoggerFactory.getLogger(MuxMessageParser.class);
33+
34+
protected final Handler<MuxMessage> muxMessageHandler;
35+
36+
public MuxMessageParser(Handler<MuxMessage> muxMessageHandler, NetSocket netSocket) {
37+
// 用不到这个父级的handler,传null即可
38+
super(null, netSocket);
39+
this.muxMessageHandler = muxMessageHandler;
40+
}
41+
42+
43+
@Override
44+
protected void parse() {
45+
// 获取消息的预设总长度
46+
int totalLength = buf.getInt(lengthFieldOffset);
47+
// 校验预设总长度
48+
if (buf.length() < totalLength) {
49+
return;
50+
}
51+
// 校验最大长度
52+
if (totalLength > maxLength) {
53+
log.warn("too many bytes in length field, {} > {}, connection {} -- {} will be closed",
54+
totalLength, maxLength,
55+
netSocket.localAddress(),
56+
netSocket.remoteAddress());
57+
netSocket.close();
58+
return;
59+
}
60+
// 校验类型编码是否在预设范围内
61+
if (totalLength >= (lengthFieldLength + typeFieldLength)) {
62+
short code = buf.getShort(lengthFieldLength);
63+
if (Mux.type == code) {
64+
65+
} else {
66+
log.warn("invalid type, connection {} -- {} will be closed",
67+
netSocket.localAddress(),
68+
netSocket.remoteAddress());
69+
netSocket.close();
70+
return;
71+
}
72+
73+
}
74+
muxMessageHandler.handle(new MuxMessage(buf.getBuffer(0, totalLength), buf.getBuffer(totalLength, buf.length())));
75+
}
76+
}

src/main/java/top/meethigher/proxy/tcp/mux/ReverseTcpProxyMuxServer.java

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
package top.meethigher.proxy.tcp.mux;
22

33
import io.vertx.core.Vertx;
4+
import io.vertx.core.net.NetClient;
5+
import io.vertx.core.net.NetServer;
6+
import io.vertx.core.net.NetSocket;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import top.meethigher.proxy.NetAddress;
410

511
import java.util.concurrent.ThreadLocalRandom;
612

@@ -15,12 +21,54 @@
1521
*/
1622
public class ReverseTcpProxyMuxServer extends Mux {
1723

18-
// protected final NetServer netServer;
19-
// protected final NetClient netClient;
20-
// protected final String name;
24+
private static final Logger log = LoggerFactory.getLogger(ReverseTcpProxyMuxServer.class);
25+
protected final NetServer netServer;
26+
protected final NetClient netClient;
27+
protected final String name;
2128

22-
public ReverseTcpProxyMuxServer(Vertx vertx, String secret) {
29+
protected String host = "0.0.0.0";
30+
protected int port = 997;
31+
32+
protected ReverseTcpProxyMuxServer(Vertx vertx, String secret, NetServer netServer, NetClient netClient, String name) {
2333
super(vertx, secret);
34+
this.netServer = netServer;
35+
this.netClient = netClient;
36+
this.name = name;
37+
}
38+
39+
protected void handleConnect(NetSocket src) {
40+
src.pause();
41+
log.debug("source {} -- {} connected", src.localAddress(), src.remoteAddress());
42+
src.exceptionHandler(e -> log.error("source {} -- {} exception occurred", src.localAddress(), src.remoteAddress(), e))
43+
.closeHandler(v -> log.debug("source {} -- {} closed", src.localAddress(), src.remoteAddress()));
44+
src.handler(new MuxMessageParser(muxMsg -> this.bindMuxConnections(src, muxMsg), src));
45+
src.resume();
46+
}
47+
48+
/**
49+
* 根据{@code MuxMessage }建立后端连接,并将数据连接和后端连接进行绑定
50+
*/
51+
protected void bindMuxConnections(NetSocket src, MuxMessageParser.MuxMessage muxMsg) {
52+
src.pause();
53+
NetAddress backend = aesBase64Decode(muxMsg.backendServerBuf);
54+
if (backend == null) {
55+
log.warn("source {} -- {} exception occurred: failed to parsing the backendServer address from encrypted content:{}",
56+
src.localAddress(), src.remoteAddress(),
57+
muxMsg.backendServerBuf);
58+
src.close();
59+
return;
60+
}
61+
netClient.connect(backend.getPort(), backend.getHost())
62+
.onFailure(e -> {
63+
log.error("source {} -- {} failed to connect to {}", src.localAddress(), src.remoteAddress(), backend, e);
64+
src.close();
65+
})
66+
.onSuccess(dst -> {
67+
dst.pause();
68+
log.debug("target {} -- {} connected",dst.localAddress(),dst.remoteAddress());
69+
70+
});
71+
2472
}
2573

2674
public static String generateName() {
@@ -42,4 +90,20 @@ public static String generateName() {
4290
}
4391
}
4492

93+
public void start() {
94+
netServer.connectHandler(this::handleConnect)
95+
.exceptionHandler(e -> log.error("{} socket errors happening before the connection is passed to the connectHandler", name, e))
96+
.listen(port, host)
97+
.onFailure(e -> log.error("{} start failed", name))
98+
.onSuccess(v -> log.info("{} started on {}:{}", name, host, port));
99+
}
100+
101+
public void stop() {
102+
netServer.close()
103+
.onSuccess(v -> log.info("{} closed", name))
104+
.onFailure(e -> log.error("{} close failed", name, e));
105+
106+
}
107+
108+
45109
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package top.meethigher.proxy.tcp.mux;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.buffer.Buffer;
5+
import io.vertx.core.net.NetClient;
6+
import io.vertx.core.net.NetSocket;
7+
import io.vertx.core.parsetools.RecordParser;
8+
import org.junit.Test;
9+
import top.meethigher.proxy.NetAddress;
10+
11+
import java.util.concurrent.locks.LockSupport;
12+
13+
public class MuxMessageParserTest {
14+
15+
16+
/**
17+
* 我想验证,如果一次性发了一大串内容。RecordParser可以只读取前1个字节,就pause,当resume可以继续从第2个之后消费
18+
* <p>
19+
* 结论:不行
20+
*/
21+
@Test
22+
public void testRecordParserAndPause() {
23+
Vertx vertx = Vertx.vertx();
24+
vertx.createNetServer().connectHandler(socket -> {
25+
socket.pause();
26+
RecordParser parser = RecordParser.newFixed(1);
27+
parser.handler(buf -> {
28+
System.out.println(buf);
29+
socket.pause();
30+
vertx.setTimer(5000, id -> {
31+
socket.handler(buf1 -> {
32+
System.out.println(buf1);
33+
});
34+
socket.resume();
35+
});
36+
});
37+
socket.handler(parser);
38+
socket.resume();
39+
}).listen(777).onFailure(e -> System.exit(1));
40+
41+
NetClient netClient = vertx.createNetClient();
42+
netClient.connect(777, "127.0.0.1").onSuccess(socket -> {
43+
socket.write("123456789");
44+
vertx.setTimer(1000, id -> {
45+
socket.write("abcdefg");
46+
});
47+
});
48+
49+
LockSupport.park();
50+
}
51+
52+
@Test
53+
public void name() {
54+
55+
NetSocket socket = null;
56+
57+
Vertx vertx = Vertx.vertx();
58+
Mux mux = new Mux(vertx, Mux.SECRET_DEFAULT) {
59+
};
60+
NetAddress netAddress1 = new NetAddress("127.0.0.1", 8080);
61+
62+
63+
Buffer buffer1 = mux.aesBase64Encode(netAddress1);
64+
65+
66+
Buffer buffer2 = Buffer.buffer("halo wode");
67+
68+
Buffer buffer = Buffer.buffer().appendBuffer(buffer1).appendBuffer(buffer2);
69+
70+
71+
MuxMessageParser parser = new MuxMessageParser(msg -> {
72+
System.out.println("控制消息:" + mux.aesBase64Decode(msg.backendServerBuf));
73+
System.out.println("用户消息:" + msg.payload);
74+
}, socket);
75+
76+
parser.handle(buffer);
77+
78+
79+
LockSupport.park();
80+
}
81+
}

0 commit comments

Comments
 (0)