Skip to content

Commit d3d2c9a

Browse files
committed
feat: 代码封装上的微调
1 parent 9ec5b35 commit d3d2c9a

File tree

4 files changed

+193
-75
lines changed

4 files changed

+193
-75
lines changed

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414
import top.meethigher.proxy.tcp.tunnel.handler.TunnelHandler;
1515
import top.meethigher.proxy.tcp.tunnel.proto.TunnelMessage;
1616

17-
import java.util.LinkedHashMap;
1817
import java.util.Map;
18+
import java.util.concurrent.ConcurrentHashMap;
1919
import java.util.concurrent.CountDownLatch;
2020
import java.util.concurrent.ThreadLocalRandom;
2121
import java.util.concurrent.atomic.AtomicBoolean;
2222

2323
/**
24-
*
25-
*
2624
* <p>背景:</p><p>我近期买了个树莓派,但是又不想随身带着树莓派,因此希望可以公网访问。</p>
2725
* <p>
2826
* 但是使用<a href="https://github.com/fatedier/frp">fatedier/frp</a>的过程中,不管在Windows还是Linux,都被扫出病毒了。
@@ -42,35 +40,35 @@ public class ReverseTcpProxyTunnelServer extends TunnelServer {
4240

4341
protected String host = "0.0.0.0";
4442
protected int port = 44444;
45-
protected Map<NetSocket, DataProxyServer> authedSockets = new LinkedHashMap<>();// 授权成功的socket列表
43+
protected Map<NetSocket, DataProxyServer> authedSockets = new ConcurrentHashMap<>();// 授权成功的socket列表
4644

4745
protected final String secret;
4846
protected final String name;
49-
protected final Handler<NetSocket> connectHandler;
5047

5148
public ReverseTcpProxyTunnelServer(Vertx vertx, NetServer netServer, String secret, String name) {
5249
super(vertx, netServer);
5350
this.secret = secret;
5451
this.name = name;
55-
this.connectHandler = socket -> {
56-
socket.pause();
57-
socket.handler(decode(socket));
58-
socket.closeHandler(v -> {
59-
log.debug("closed {} -- {}", socket.remoteAddress(), socket.localAddress());
60-
DataProxyServer removed = authedSockets.remove(socket);
61-
if (removed != null) {
62-
removed.stop();
63-
}
64-
});
65-
TunnelHandler connectedHandler = tunnelHandlers.get(null);
66-
if (connectedHandler != null) {
67-
connectedHandler.handle(vertx, socket, Buffer.buffer());
68-
}
69-
socket.resume();
70-
};
7152
addMessageHandler();
7253
}
7354

55+
protected void handleConnect(NetSocket socket) {
56+
socket.pause();
57+
socket.handler(decode(socket));
58+
socket.closeHandler(v -> {
59+
log.debug("closed {} -- {}", socket.remoteAddress(), socket.localAddress());
60+
DataProxyServer removed = authedSockets.remove(socket);
61+
if (removed != null) {
62+
removed.stop();
63+
}
64+
});
65+
TunnelHandler connectedHandler = tunnelHandlers.get(null);
66+
if (connectedHandler != null) {
67+
connectedHandler.handle(vertx, socket, Buffer.buffer());
68+
}
69+
socket.resume();
70+
}
71+
7472
/**
7573
* 注册内网穿透的监听逻辑
7674
*/
@@ -180,7 +178,7 @@ public void start() {
180178
log.error("{} start failed", name, e);
181179
}
182180
};
183-
netServer.connectHandler(connectHandler).exceptionHandler(e -> log.error("connect failed", e));
181+
netServer.connectHandler(this::handleConnect).exceptionHandler(e -> log.error("connect failed", e));
184182
netServer.listen(port, host).onComplete(asyncResultHandler);
185183
}
186184

src/main/java/top/meethigher/proxy/tcp/tunnel/TunnelClient.java

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package top.meethigher.proxy.tcp.tunnel;
22

33
import io.vertx.core.AsyncResult;
4-
import io.vertx.core.Handler;
54
import io.vertx.core.Vertx;
65
import io.vertx.core.net.NetClient;
76
import io.vertx.core.net.NetSocket;
@@ -81,42 +80,7 @@ public TunnelMessageParser decode(final NetSocket socket) {
8180
*/
8281
public void connect(final String host, final int port) {
8382
log.debug("client connect {}:{} ...", host, port);
84-
Handler<AsyncResult<NetSocket>> asyncResultHandler = ar -> {
85-
if (ar.succeeded()) {
86-
setReconnectDelay(this.minDelay);
87-
88-
NetSocket socket = ar.result();
89-
this.netSocket = socket;
90-
socket.pause();
91-
socket.closeHandler(v -> {
92-
log.warn("closed {} -- {}, after {} ms will reconnect",
93-
socket.localAddress(),
94-
socket.remoteAddress(),
95-
reconnectDelay);
96-
this.netSocket = null;
97-
reconnect(host, port);
98-
});
99-
socket.handler(decode(socket));
100-
log.debug("client connected {}:{}", host, port);
101-
102-
// 执行连接成功的Handler
103-
TunnelHandler tunnelHandler = tunnelHandlers.get(null);
104-
if (tunnelHandler != null) {
105-
tunnelHandler.handle(vertx, socket, null);
106-
}
107-
socket.resume();
108-
} else {
109-
Throwable e = ar.cause();
110-
log.error("client connect {}:{} error, after {} ms will reconnect",
111-
host,
112-
port,
113-
reconnectDelay,
114-
e);
115-
this.netSocket = null;
116-
reconnect(host, port);
117-
}
118-
};
119-
netClient.connect(port, host).onComplete(asyncResultHandler);
83+
netClient.connect(port, host).onComplete(ar -> handleConnectCompleteAsyncResult(ar, host, port));
12084
}
12185

12286
/**
@@ -133,6 +97,50 @@ public void emit(final TunnelMessageType type, final byte[] bodyBytes) {
13397
}
13498
}
13599

100+
/**
101+
* client完成连接后的业务逻辑
102+
*
103+
* @param ar 完成结果
104+
* @param host 连接主机地址
105+
* @param port 连接端口
106+
*/
107+
protected void handleConnectCompleteAsyncResult(final AsyncResult<NetSocket> ar,
108+
final String host, final int port) {
109+
if (ar.succeeded()) {
110+
setReconnectDelay(this.minDelay);
111+
112+
NetSocket socket = ar.result();
113+
this.netSocket = socket;
114+
socket.pause();
115+
socket.closeHandler(v -> {
116+
log.warn("closed {} -- {}, after {} ms will reconnect",
117+
socket.localAddress(),
118+
socket.remoteAddress(),
119+
reconnectDelay);
120+
this.netSocket = null;
121+
reconnect(host, port);
122+
});
123+
socket.handler(decode(socket));
124+
log.debug("client connected {}:{}", host, port);
125+
126+
// 执行连接成功的Handler
127+
TunnelHandler tunnelHandler = tunnelHandlers.get(null);
128+
if (tunnelHandler != null) {
129+
tunnelHandler.handle(vertx, socket, null);
130+
}
131+
socket.resume();
132+
} else {
133+
Throwable e = ar.cause();
134+
log.error("client connect {}:{} error, after {} ms will reconnect",
135+
host,
136+
port,
137+
reconnectDelay,
138+
e);
139+
this.netSocket = null;
140+
reconnect(host, port);
141+
}
142+
}
143+
136144
protected void setReconnectDelay(final long delay) {
137145
this.reconnectDelay = delay;
138146
}

0 commit comments

Comments
 (0)