Skip to content

Commit 9df1714

Browse files
committed
feat: 针对client与server,将业务逻辑和通用逻辑进行抽象类提取
1 parent aecdf50 commit 9df1714

File tree

7 files changed

+420
-218
lines changed

7 files changed

+420
-218
lines changed
Lines changed: 68 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
package top.meethigher.proxy.tcp.tunnel;
22

3-
import io.vertx.core.AsyncResult;
4-
import io.vertx.core.Handler;
53
import io.vertx.core.Vertx;
6-
import io.vertx.core.buffer.Buffer;
74
import io.vertx.core.net.NetClient;
85
import io.vertx.core.net.NetSocket;
96
import org.slf4j.Logger;
107
import org.slf4j.LoggerFactory;
11-
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageCodec;
12-
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageParser;
138
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageType;
14-
import top.meethigher.proxy.tcp.tunnel.handler.TunnelHandler;
9+
import top.meethigher.proxy.tcp.tunnel.handler.AbstractTunnelHandler;
10+
import top.meethigher.proxy.tcp.tunnel.proto.TunnelMessage;
1511

1612
import java.util.concurrent.ThreadLocalRandom;
1713

@@ -28,47 +24,20 @@
2824
* @author <a href="https://meethigher.top">chenchuancheng</a>
2925
* @since 2025/04/01 23:25
3026
*/
31-
public class ReverseTcpProxyTunnelClient extends Tunnel {
27+
public class ReverseTcpProxyTunnelClient extends TunnelClient {
3228
private static final Logger log = LoggerFactory.getLogger(ReverseTcpProxyTunnelClient.class);
3329

3430

35-
protected static final long MIN_DELAY_DEFAULT = 1000;
36-
protected static final long MAX_DELAY_DEFAULT = 64000;
31+
protected static final long HEARTBEAT_DELAY_DEFAULT = 5000;// 毫秒
32+
protected static final long MIN_DELAY_DEFAULT = 1000;// 毫秒
33+
protected static final long MAX_DELAY_DEFAULT = 64000;// 毫秒
34+
protected static final String TOKEN_DEFAULT = "123456789";
3735
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
3836

3937

40-
/**
41-
* 连接的目标控制主机
42-
*/
43-
protected String controlHost = "127.0.0.1";
44-
45-
/**
46-
* 连接的目标控制端口
47-
*/
48-
protected int controlPort = 44444;
49-
50-
/**
51-
* 失败重连的时间间隔,单位毫秒
52-
*/
53-
protected long reconnectDelay;
54-
55-
/**
56-
* 内部维护一个长连接socket
57-
*/
58-
protected NetSocket netSocket;
59-
60-
protected final Vertx vertx;
61-
protected final NetClient netClient;
38+
protected final long heartbeatDelay;
39+
protected final String token;
6240
protected final String name;
63-
/**
64-
* Client进行失败重连时的最短间隔时间,单位毫秒
65-
*/
66-
protected final long minDelay;
67-
68-
/**
69-
* Client进行失败重连时的最大间隔时间,单位毫秒
70-
*/
71-
protected final long maxDelay;
7241

7342

7443
protected static String generateName() {
@@ -90,118 +59,79 @@ protected static String generateName() {
9059
}
9160
}
9261

93-
protected ReverseTcpProxyTunnelClient(Vertx vertx, NetClient netClient, String name, long minDelay, long maxDelay) {
94-
this.vertx = vertx;
95-
this.netClient = netClient;
62+
protected ReverseTcpProxyTunnelClient(Vertx vertx, NetClient netClient,
63+
long minDelay, long maxDelay, long heartbeatDelay,
64+
String token, String name) {
65+
super(vertx, netClient, minDelay, maxDelay);
66+
this.heartbeatDelay = heartbeatDelay;
67+
this.token = token;
9668
this.name = name;
97-
this.minDelay = minDelay;
98-
this.maxDelay = maxDelay;
99-
this.reconnectDelay = this.minDelay;
100-
}
101-
102-
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, long minDelay, long maxDelay, String name) {
103-
return new ReverseTcpProxyTunnelClient(vertx, netClient, name, minDelay, maxDelay);
104-
}
105-
106-
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, String name) {
107-
return new ReverseTcpProxyTunnelClient(vertx, netClient, name, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT);
108-
}
109-
110-
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient) {
111-
return new ReverseTcpProxyTunnelClient(vertx, netClient, generateName(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT);
69+
addMessageHandler();
11270
}
11371

114-
public static ReverseTcpProxyTunnelClient create(Vertx vertx) {
115-
return new ReverseTcpProxyTunnelClient(vertx, vertx.createNetClient(), generateName(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT);
116-
}
117-
118-
119-
public void connect(String host, int port) {
120-
this.controlHost = host;
121-
this.controlPort = port;
122-
log.debug("client connect {}:{} ...", this.controlHost, this.controlPort);
123-
124-
Handler<AsyncResult<NetSocket>> asyncResultHandler = ar -> {
125-
if (ar.succeeded()) {
126-
setReconnectDelay(this.minDelay);
127-
NetSocket socket = ar.result();
128-
this.netSocket = socket;
129-
socket.pause();
130-
socket.closeHandler(v -> {
131-
log.debug("closed {} -- {}, after {} ms will reconnect",
132-
socket.localAddress(),
133-
socket.remoteAddress(),
134-
reconnectDelay);
135-
reconnect();
136-
});
137-
socket.handler(decode(socket));
138-
log.info("client connected {}:{}", controlHost, controlPort);
139-
TunnelHandler tunnelHandler = tunnelHandlers.get(null);
140-
if (tunnelHandler != null) {
141-
tunnelHandler.handle(vertx, socket, null);
72+
/**
73+
* 注册内网穿透的监听逻辑
74+
*/
75+
protected void addMessageHandler() {
76+
// 监听连接成功事件
77+
this.onConnected((vertx, netSocket, buffer) -> emit(TunnelMessageType.AUTH, TunnelMessage.Auth
78+
.newBuilder()
79+
.setToken(token)
80+
.build()
81+
.toByteArray()));
82+
// 监听授权响应事件
83+
this.on(TunnelMessageType.AUTH_ACK, new AbstractTunnelHandler() {
84+
@Override
85+
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
86+
boolean result = false;
87+
try {
88+
TunnelMessage.AuthAck ack = TunnelMessage.AuthAck.parseFrom(bodyBytes);
89+
result = ack.getSuccess();
90+
if (result) {
91+
vertx.setTimer(heartbeatDelay, id -> emit(TunnelMessageType.HEARTBEAT,
92+
TunnelMessage.Heartbeat.newBuilder()
93+
.setTimestamp(System.currentTimeMillis())
94+
.build()
95+
.toByteArray()));
96+
}
97+
} catch (Exception e) {
98+
}
99+
return result;
100+
}
101+
});
102+
// 监听心跳响应事件
103+
this.on(TunnelMessageType.HEARTBEAT_ACK, new AbstractTunnelHandler() {
104+
@Override
105+
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
106+
try {
107+
vertx.setTimer(heartbeatDelay, id -> emit(TunnelMessageType.HEARTBEAT,
108+
TunnelMessage.Heartbeat.newBuilder()
109+
.setTimestamp(System.currentTimeMillis())
110+
.build()
111+
.toByteArray()));
112+
} catch (Exception e) {
142113
}
143-
socket.resume();
144-
} else {
145-
Throwable e = ar.cause();
146-
log.error("client connect {}:{} error, after {} ms will reconnect",
147-
host,
148-
port,
149-
reconnectDelay,
150-
e);
151-
reconnect();
114+
return true;
152115
}
153-
};
154-
netClient.connect(this.controlPort, this.controlHost).onComplete(asyncResultHandler);
116+
});
155117
}
156118

157-
public void emit(Buffer buffer) {
158-
if (netSocket == null) {
159-
log.warn("socket is closed");
160-
} else {
161-
netSocket.write(buffer);
162-
}
119+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, long minDelay, long maxDelay, long heartbeatDelay, String token, String name) {
120+
return new ReverseTcpProxyTunnelClient(vertx, netClient, minDelay, maxDelay, heartbeatDelay, token, name);
163121
}
164122

165-
166-
@Override
167-
public void onConnected(TunnelHandler tunnelHandler) {
168-
tunnelHandlers.put(null, tunnelHandler);
123+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, String token) {
124+
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, token, generateName());
169125
}
170126

171-
@Override
172-
public void on(TunnelMessageType type, TunnelHandler tunnelHandler) {
173-
tunnelHandlers.put(type, tunnelHandler);
174-
}
175127

176-
@Override
177-
public TunnelMessageParser decode(NetSocket socket) {
178-
return new TunnelMessageParser(buffer -> {
179-
TunnelMessageCodec.DecodedMessage decodedMessage = TunnelMessageCodec.decode(buffer);
180-
TunnelMessageType type = TunnelMessageType.fromCode(decodedMessage.type);
181-
for (TunnelMessageType tunnelMessageType : tunnelHandlers.keySet()) {
182-
if (type == tunnelMessageType) {
183-
TunnelHandler tunnelHandler = tunnelHandlers.get(tunnelMessageType);
184-
if (tunnelHandler != null) {
185-
tunnelHandler.handle(vertx, socket, buffer);
186-
}
187-
}
188-
}
189-
}, socket);
128+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient) {
129+
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, TOKEN_DEFAULT, generateName());
190130
}
191131

192-
protected void setReconnectDelay(long delay) {
193-
this.reconnectDelay = delay;
132+
public static ReverseTcpProxyTunnelClient create(Vertx vertx) {
133+
return new ReverseTcpProxyTunnelClient(vertx, vertx.createNetClient(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, TOKEN_DEFAULT, generateName());
194134
}
195135

196-
/**
197-
* 采用指数退避策略进行失败重连
198-
*/
199-
protected void reconnect() {
200-
netSocket = null;
201-
vertx.setTimer(reconnectDelay, id -> {
202-
log.info("client reconnect {}:{} ...", this.controlHost, this.controlPort);
203-
connect(this.controlHost, this.controlPort);
204-
setReconnectDelay(Math.min(reconnectDelay * 2, this.maxDelay));
205-
});
206-
}
136+
207137
}

0 commit comments

Comments
 (0)