Skip to content

Commit aecdf50

Browse files
committed
feat: 1. 参考socket.io-java-client初步封装tunnel 2. 添加单元测试
1 parent c0393fa commit aecdf50

File tree

11 files changed

+514
-134
lines changed

11 files changed

+514
-134
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ ReverseTcpProxy.create(Vertx.vertx(), "10.0.0.1", 8080)
4242

4343
虚线表示连接通信。
4444

45+
一些代码上的设计思路,参考[socket.io-client-java](https://github.com/socketio/socket.io-client-java/blob/socket.io-client-2.1.0/src/main/java/io/socket/client/Socket.java)
46+
4547
```mermaid
4648
sequenceDiagram
4749
participant user as User
Lines changed: 196 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,207 @@
11
package top.meethigher.proxy.tcp.tunnel;
22

3+
import io.vertx.core.AsyncResult;
4+
import io.vertx.core.Handler;
5+
import io.vertx.core.Vertx;
6+
import io.vertx.core.buffer.Buffer;
7+
import io.vertx.core.net.NetClient;
8+
import io.vertx.core.net.NetSocket;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageCodec;
12+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageParser;
13+
import top.meethigher.proxy.tcp.tunnel.codec.TunnelMessageType;
14+
import top.meethigher.proxy.tcp.tunnel.handler.TunnelHandler;
15+
16+
import java.util.concurrent.ThreadLocalRandom;
17+
318
/**
4-
* 背景:我近期买了个树莓派,但是又不想随身带着树莓派,因此希望可以公网访问。
19+
* 一个{@code ReverseTcpProxyTunnelClient} 对应一个失败重连的 TCP 连接。如果需要多个 TCP 连接,那么就需要创建多个 {@code ReverseTcpProxyTunnelClient} 实例
20+
*
21+
* <p>背景:</p><p>我近期买了个树莓派,但是又不想随身带着树莓派,因此希望可以公网访问。</p>
22+
* <p>
523
* 但是使用<a href="https://github.com/fatedier/frp">fatedier/frp</a>的过程中,不管在Windows还是Linux,都被扫出病毒了。
624
* 而且这还是Golang自身的问题,参考<a href="https://go.dev/doc/faq#virus">Why does my virus-scanning software think my Go distribution or compiled binary is infected?</a>
725
* 因此自己使用Java实现一套类似frp的工具,还是很有必要的。
26+
* </p>
827
*
928
* @author <a href="https://meethigher.top">chenchuancheng</a>
1029
* @since 2025/04/01 23:25
1130
*/
12-
public class ReverseTcpProxyTunnelClient {
31+
public class ReverseTcpProxyTunnelClient extends Tunnel {
32+
private static final Logger log = LoggerFactory.getLogger(ReverseTcpProxyTunnelClient.class);
33+
34+
35+
protected static final long MIN_DELAY_DEFAULT = 1000;
36+
protected static final long MAX_DELAY_DEFAULT = 64000;
37+
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
38+
39+
40+
/**
41+
* 连接的目标控制主机
42+
*/
43+
protected String controlHost = "127.0.0.1";
44+
45+
/**
46+
* 连接的目标控制端口
47+
*/
48+
protected int controlPort = 44444;
49+
50+
/**
51+
* 失败重连的时间间隔,单位毫秒
52+
*/
53+
protected long reconnectDelay;
54+
55+
/**
56+
* 内部维护一个长连接socket
57+
*/
58+
protected NetSocket netSocket;
59+
60+
protected final Vertx vertx;
61+
protected final NetClient netClient;
62+
protected final String name;
63+
/**
64+
* Client进行失败重连时的最短间隔时间,单位毫秒
65+
*/
66+
protected final long minDelay;
67+
68+
/**
69+
* Client进行失败重连时的最大间隔时间,单位毫秒
70+
*/
71+
protected final long maxDelay;
72+
73+
74+
protected static String generateName() {
75+
final String prefix = "ReverseTcpProxyTunnelClient-";
76+
try {
77+
// 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠
78+
synchronized (System.getProperties()) {
79+
final String next = String.valueOf(Integer.getInteger("top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelClient.name", 0) + 1);
80+
System.setProperty("top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelClient.name", next);
81+
return prefix + next;
82+
}
83+
} catch (Exception e) {
84+
final ThreadLocalRandom random = ThreadLocalRandom.current();
85+
final StringBuilder sb = new StringBuilder(prefix);
86+
for (int i = 0; i < 4; i++) {
87+
sb.append(ID_CHARACTERS[random.nextInt(62)]);
88+
}
89+
return sb.toString();
90+
}
91+
}
92+
93+
protected ReverseTcpProxyTunnelClient(Vertx vertx, NetClient netClient, String name, long minDelay, long maxDelay) {
94+
this.vertx = vertx;
95+
this.netClient = netClient;
96+
this.name = name;
97+
this.minDelay = minDelay;
98+
this.maxDelay = maxDelay;
99+
this.reconnectDelay = this.minDelay;
100+
}
101+
102+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, long minDelay, long maxDelay, String name) {
103+
return new ReverseTcpProxyTunnelClient(vertx, netClient, name, minDelay, maxDelay);
104+
}
105+
106+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, String name) {
107+
return new ReverseTcpProxyTunnelClient(vertx, netClient, name, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT);
108+
}
109+
110+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient) {
111+
return new ReverseTcpProxyTunnelClient(vertx, netClient, generateName(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT);
112+
}
113+
114+
public static ReverseTcpProxyTunnelClient create(Vertx vertx) {
115+
return new ReverseTcpProxyTunnelClient(vertx, vertx.createNetClient(), generateName(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT);
116+
}
117+
118+
119+
public void connect(String host, int port) {
120+
this.controlHost = host;
121+
this.controlPort = port;
122+
log.debug("client connect {}:{} ...", this.controlHost, this.controlPort);
123+
124+
Handler<AsyncResult<NetSocket>> asyncResultHandler = ar -> {
125+
if (ar.succeeded()) {
126+
setReconnectDelay(this.minDelay);
127+
NetSocket socket = ar.result();
128+
this.netSocket = socket;
129+
socket.pause();
130+
socket.closeHandler(v -> {
131+
log.debug("closed {} -- {}, after {} ms will reconnect",
132+
socket.localAddress(),
133+
socket.remoteAddress(),
134+
reconnectDelay);
135+
reconnect();
136+
});
137+
socket.handler(decode(socket));
138+
log.info("client connected {}:{}", controlHost, controlPort);
139+
TunnelHandler tunnelHandler = tunnelHandlers.get(null);
140+
if (tunnelHandler != null) {
141+
tunnelHandler.handle(vertx, socket, null);
142+
}
143+
socket.resume();
144+
} else {
145+
Throwable e = ar.cause();
146+
log.error("client connect {}:{} error, after {} ms will reconnect",
147+
host,
148+
port,
149+
reconnectDelay,
150+
e);
151+
reconnect();
152+
}
153+
};
154+
netClient.connect(this.controlPort, this.controlHost).onComplete(asyncResultHandler);
155+
}
156+
157+
public void emit(Buffer buffer) {
158+
if (netSocket == null) {
159+
log.warn("socket is closed");
160+
} else {
161+
netSocket.write(buffer);
162+
}
163+
}
164+
165+
166+
@Override
167+
public void onConnected(TunnelHandler tunnelHandler) {
168+
tunnelHandlers.put(null, tunnelHandler);
169+
}
170+
171+
@Override
172+
public void on(TunnelMessageType type, TunnelHandler tunnelHandler) {
173+
tunnelHandlers.put(type, tunnelHandler);
174+
}
175+
176+
@Override
177+
public TunnelMessageParser decode(NetSocket socket) {
178+
return new TunnelMessageParser(buffer -> {
179+
TunnelMessageCodec.DecodedMessage decodedMessage = TunnelMessageCodec.decode(buffer);
180+
TunnelMessageType type = TunnelMessageType.fromCode(decodedMessage.type);
181+
for (TunnelMessageType tunnelMessageType : tunnelHandlers.keySet()) {
182+
if (type == tunnelMessageType) {
183+
TunnelHandler tunnelHandler = tunnelHandlers.get(tunnelMessageType);
184+
if (tunnelHandler != null) {
185+
tunnelHandler.handle(vertx, socket, buffer);
186+
}
187+
}
188+
}
189+
}, socket);
190+
}
191+
192+
protected void setReconnectDelay(long delay) {
193+
this.reconnectDelay = delay;
194+
}
195+
196+
/**
197+
* 采用指数退避策略进行失败重连
198+
*/
199+
protected void reconnect() {
200+
netSocket = null;
201+
vertx.setTimer(reconnectDelay, id -> {
202+
log.info("client reconnect {}:{} ...", this.controlHost, this.controlPort);
203+
connect(this.controlHost, this.controlPort);
204+
setReconnectDelay(Math.min(reconnectDelay * 2, this.maxDelay));
205+
});
206+
}
13207
}

0 commit comments

Comments
 (0)