Skip to content

Commit 5485577

Browse files
committed
feat: 1. protobuf封装tcp通信编码规范 2. 自定义tcp解码逻辑
(cherry picked from commit 6d860af)
1 parent 2f0ef00 commit 5485577

File tree

11 files changed

+2728
-0
lines changed

11 files changed

+2728
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package top.meethigher.proxy.tcp.tunnel.codec;
2+
3+
import io.vertx.core.buffer.Buffer;
4+
5+
/**
6+
* 自定义消息编解码
7+
* <pre>
8+
* | 4 字节(消息长度) | 2 字节(消息类型) | 变长(消息体) |
9+
* | 0x0010 | 0x0001 | {"id":1, "msg":"Hello"} |
10+
* </pre>
11+
*
12+
* @author <a href="https://meethigher.top">chenchuancheng</a>
13+
* @since 2025/04/04 23:12
14+
*/
15+
public class TunnelMessageCodec {
16+
17+
18+
public static Buffer encode(short type, byte[] body) {
19+
int totalLength = 4 + 2 + body.length;
20+
Buffer buffer = Buffer.buffer();
21+
buffer.appendInt(totalLength);
22+
buffer.appendShort(type);
23+
buffer.appendBytes(body);
24+
return buffer;
25+
}
26+
27+
public static DecodedMessage decode(Buffer buffer) {
28+
int totalLength = buffer.getInt(0);
29+
short type = buffer.getShort(4);
30+
byte[] body = buffer.getBytes(6, buffer.length());
31+
return new DecodedMessage(totalLength, type, body);
32+
}
33+
34+
public static class DecodedMessage {
35+
public final int totalLength;
36+
public final short type;
37+
public final byte[] body;
38+
39+
public DecodedMessage(int totalLength, short type, byte[] body) {
40+
this.totalLength = totalLength;
41+
this.type = type;
42+
this.body = body;
43+
}
44+
}
45+
46+
47+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package top.meethigher.proxy.tcp.tunnel.codec;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.buffer.Buffer;
5+
6+
/**
7+
* 自定义消息结构解析器
8+
* [4字节长度+2字节类型+protobuf变长消息体]
9+
*
10+
* @author <a href="https://meethigher.top">chenchuancheng</a>
11+
* @since 2025/04/04 02:52
12+
*/
13+
public class TunnelMessageParser implements Handler<Buffer> {
14+
15+
private Buffer buf = Buffer.buffer();
16+
17+
/**
18+
* 预设长度起始位置
19+
*/
20+
private final int lengthFieldOffset = 0;
21+
22+
/**
23+
* 预设长度占用的字节数
24+
*/
25+
private final int lengthFieldLength = 4;
26+
/**
27+
* 消息类型起始位置
28+
*/
29+
private final int typeFieldOffset = 4;
30+
31+
/**
32+
* 消息类型占用的字节数
33+
*/
34+
private final int typeFieldLength = 2;
35+
36+
/**
37+
* 消息体起始位置
38+
*/
39+
private final int bodyFieldOffset = 6;
40+
41+
private final Handler<Buffer> outputHandler;
42+
43+
public TunnelMessageParser(Handler<Buffer> outputHandler) {
44+
this.outputHandler = outputHandler;
45+
}
46+
47+
@Override
48+
public void handle(Buffer buffer) {
49+
buf.appendBuffer(buffer);
50+
if (buf.length() < lengthFieldLength) {
51+
return;
52+
} else {
53+
int totalLength = buf.getInt(lengthFieldOffset);
54+
if (buf.length() < totalLength) {
55+
return;
56+
} else {
57+
outputHandler.handle(buf.getBuffer(0, totalLength));
58+
buf = buf.getBuffer(totalLength, buf.length());
59+
}
60+
}
61+
62+
}
63+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package top.meethigher.proxy.tcp.tunnel.codec;
2+
3+
/**
4+
* 消息类型
5+
*
6+
* @author <a href="https://meethigher.top">chenchuancheng</a>
7+
* @since 2025/04/04 23:11
8+
*/
9+
public enum TunnelMessageType {
10+
// 客户端 → 服务端
11+
AUTH(0x0001),
12+
HEARTBEAT(0x0003),
13+
OPEN_PORT(0x0005),
14+
CONNECT_PORT_ACK(0x0008),
15+
16+
// 服务端 → 客户端
17+
AUTH_ACK(0x0002),
18+
HEARTBEAT_ACK(0x0004),
19+
OPEN_PORT_ACK(0x0006),
20+
CONNECT_PORT(0x0007);
21+
22+
private final int code;
23+
24+
TunnelMessageType(int code) {
25+
this.code = code;
26+
}
27+
28+
public short code() {
29+
return (short) code;
30+
}
31+
32+
public static TunnelMessageType fromCode(short code) {
33+
for (TunnelMessageType type : values()) {
34+
if (type.code() == code) {
35+
return type;
36+
}
37+
}
38+
throw new IllegalArgumentException("Unknown message type: " + code);
39+
}
40+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package top.meethigher.proxy.tcp.tunnel.handler;
2+
3+
import io.vertx.core.net.NetSocket;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageCodec;
7+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageType;
8+
import top.meethigher.proxy.tcp.tunnel.proto.TunnelMessage;
9+
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
public class AuthHandler extends TunnelHandler {
13+
14+
15+
private static final Logger log = LoggerFactory.getLogger(AuthHandler.class);
16+
17+
public AuthHandler(NetSocket netSocket, TunnelMessageType type, TunnelMessageType respType, byte[] bodyBytes) {
18+
super(netSocket, type, respType, bodyBytes);
19+
}
20+
21+
@Override
22+
protected boolean doHandler() {
23+
TunnelMessage.AuthAck.Builder builder = TunnelMessage.AuthAck.newBuilder();
24+
AtomicBoolean success = new AtomicBoolean(false);
25+
String msg = "";
26+
try {
27+
TunnelMessage.Auth parsed = TunnelMessage.Auth.parseFrom(bodyBytes);
28+
String token = parsed.getToken();
29+
if ("token".equals(token)) {
30+
success.set(true);
31+
} else {
32+
msg = "auth failed";
33+
}
34+
} catch (Exception e) {
35+
msg = e.getMessage();
36+
log.error("Error parsing message: {}", type, e);
37+
} finally {
38+
39+
netSocket.write(TunnelMessageCodec.encode(respType.code(), builder.setSuccess(success.get()).setMessage(msg).build().toByteArray())).onSuccess(v -> {
40+
if (!success.get()) {
41+
netSocket.close();
42+
}
43+
});
44+
}
45+
return success.get();
46+
}
47+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package top.meethigher.proxy.tcp.tunnel.handler;
2+
3+
import io.vertx.core.net.NetSocket;
4+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageCodec;
5+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageType;
6+
import top.meethigher.proxy.tcp.tunnel.proto.TunnelMessage;
7+
8+
public class HeartbeatHandler extends TunnelHandler {
9+
public HeartbeatHandler(NetSocket netSocket, TunnelMessageType type, TunnelMessageType respType, byte[] bodyBytes) {
10+
super(netSocket, type, respType, bodyBytes);
11+
}
12+
13+
@Override
14+
protected boolean doHandler() {
15+
TunnelMessage.HeartbeatAck ack = TunnelMessage.HeartbeatAck.newBuilder().setTimestamp(System.currentTimeMillis()).build();
16+
netSocket.write(TunnelMessageCodec.encode(respType.code(), ack.toByteArray()));
17+
return true;
18+
}
19+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package top.meethigher.proxy.tcp.tunnel.handler;
2+
3+
import io.vertx.core.net.NetSocket;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageCodec;
7+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageType;
8+
import top.meethigher.proxy.tcp.tunnel.proto.TunnelMessage;
9+
10+
public class OpenPortHandler extends TunnelHandler {
11+
private static final Logger log = LoggerFactory.getLogger(OpenPortHandler.class);
12+
13+
public OpenPortHandler(NetSocket netSocket, TunnelMessageType type, TunnelMessageType respType, byte[] bodyBytes) {
14+
super(netSocket, type, respType, bodyBytes);
15+
}
16+
17+
@Override
18+
protected boolean doHandler() {
19+
TunnelMessage.OpenPortAck.Builder builder = TunnelMessage.OpenPortAck.newBuilder();
20+
boolean success = false;
21+
String msg = "";
22+
try {
23+
TunnelMessage.OpenPort parsed = TunnelMessage.OpenPort.parseFrom(bodyBytes);
24+
int port = parsed.getPort();
25+
log.info("开启 {} 端口", port);
26+
success = true;
27+
28+
} catch (Exception e) {
29+
msg = e.getMessage();
30+
log.error("");
31+
} finally {
32+
netSocket.write(TunnelMessageCodec.encode(respType.code(), builder.setSuccess(success).setMessage(msg).build().toByteArray()));
33+
}
34+
return success;
35+
}
36+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package top.meethigher.proxy.tcp.tunnel.handler;
2+
3+
import io.vertx.core.net.NetSocket;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageType;
7+
8+
public abstract class TunnelHandler {
9+
10+
private static final Logger log = LoggerFactory.getLogger(TunnelHandler.class);
11+
protected final NetSocket netSocket;
12+
13+
protected final TunnelMessageType type;
14+
15+
protected final TunnelMessageType respType;
16+
17+
protected final byte[] bodyBytes;
18+
19+
public TunnelHandler(NetSocket netSocket, TunnelMessageType type, TunnelMessageType respType, byte[] bodyBytes) {
20+
this.netSocket = netSocket;
21+
this.type = type;
22+
this.respType = respType;
23+
this.bodyBytes = bodyBytes;
24+
}
25+
26+
public void handler() {
27+
boolean result = doHandler();
28+
log.debug("received message type = {}, handle result = {}", type, result);
29+
}
30+
31+
32+
protected abstract boolean doHandler();
33+
34+
35+
}

0 commit comments

Comments
 (0)