Skip to content

Commit 00300b4

Browse files
committed
feat: 1. 调整单元测试时的日志格式 2. 代码微调
(cherry picked from commit 08d1eb84394ac4ead9e76bda14e90552cc974be6)
1 parent 8f2a631 commit 00300b4

File tree

7 files changed

+364
-96
lines changed

7 files changed

+364
-96
lines changed

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,23 @@
2626
*/
2727
public class ReverseTcpProxyTunnelClient extends TunnelClient {
2828
private static final Logger log = LoggerFactory.getLogger(ReverseTcpProxyTunnelClient.class);
29-
30-
29+
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
30+
protected static final String SECRET_DEFAULT = "0123456789";
3131
protected static final long HEARTBEAT_DELAY_DEFAULT = 5000;// 毫秒
3232
protected static final long MIN_DELAY_DEFAULT = 1000;// 毫秒
3333
protected static final long MAX_DELAY_DEFAULT = 64000;// 毫秒
34-
protected static final String SECRET_DEFAULT = "123456789";
35-
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
3634

3735

3836
protected final long heartbeatDelay;
3937
protected final String secret;
4038
protected final String name;
4139

4240

43-
protected String localHost = "127.0.0.1";
44-
protected int localPort = 2222;
45-
protected int remotePort = 2222;
41+
protected String backendHost = "127.0.0.1";
42+
protected int backendPort = 22;
43+
protected String dataProxyHost = "127.0.0.1";
44+
protected int dataProxyPort = 2222;
45+
protected String dataProxyName = "ssh-proxy";
4646

4747

4848
protected static String generateName() {
@@ -74,18 +74,28 @@ protected ReverseTcpProxyTunnelClient(Vertx vertx, NetClient netClient,
7474
addMessageHandler();
7575
}
7676

77-
public ReverseTcpProxyTunnelClient localHost(String localHost) {
78-
this.localHost = localHost;
77+
public ReverseTcpProxyTunnelClient backendHost(String backendHost) {
78+
this.backendHost = backendHost;
79+
return this;
80+
}
81+
82+
public ReverseTcpProxyTunnelClient backendPort(int backendPort) {
83+
this.backendPort = backendPort;
84+
return this;
85+
}
86+
87+
public ReverseTcpProxyTunnelClient dataProxyHost(String dataProxyHost) {
88+
this.dataProxyHost = dataProxyHost;
7989
return this;
8090
}
8191

82-
public ReverseTcpProxyTunnelClient localPort(int localPort) {
83-
this.localPort = localPort;
92+
public ReverseTcpProxyTunnelClient dataProxyPort(int dataProxyPort) {
93+
this.dataProxyPort = dataProxyPort;
8494
return this;
8595
}
8696

87-
public ReverseTcpProxyTunnelClient remotePort(int remotePort) {
88-
this.remotePort = remotePort;
97+
public ReverseTcpProxyTunnelClient dataProxyName(String dataProxyName) {
98+
this.dataProxyName = dataProxyName;
8999
return this;
90100
}
91101

@@ -97,7 +107,8 @@ protected void addMessageHandler() {
97107
this.onConnected((vertx, netSocket, buffer) -> netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT,
98108
TunnelMessage.OpenDataPort.newBuilder()
99109
.setSecret(secret)
100-
.setPort(remotePort)
110+
.setDataProxyPort(dataProxyPort)
111+
.setDataProxyName(dataProxyName)
101112
.build().toByteArray())));
102113

103114
// 监听授权与开通数据端口事件
@@ -109,11 +120,12 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
109120
TunnelMessage.OpenDataPortAck parsed = TunnelMessage.OpenDataPortAck.parseFrom(bodyBytes);
110121
if (parsed.getSuccess()) {
111122
// 如果认证 + 开通端口成功,那么就需要进行长连接保持,并开启定期心跳。
123+
result = true;
112124
vertx.setTimer(heartbeatDelay, id -> netSocket.write(encode(TunnelMessageType.HEARTBEAT,
113125
TunnelMessage.Heartbeat.newBuilder().setTimestamp(System.currentTimeMillis()).build().toByteArray())));
114126
} else {
115127
// 如果认证失败,服务端会主动关闭 tcp 连接
116-
log.warn("{} error : {}", TunnelMessageType.OPEN_DATA_PORT_ACK, parsed.getMessage());
128+
log.warn("message type {} : {}", TunnelMessageType.OPEN_DATA_PORT_ACK, parsed.getMessage());
117129
}
118130
} catch (Exception e) {
119131
}
@@ -131,6 +143,20 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
131143
return true;
132144
}
133145
});
146+
147+
// 监听数据连接事件
148+
this.on(TunnelMessageType.OPEN_DATA_CONN, new AbstractTunnelHandler() {
149+
@Override
150+
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
151+
boolean result = false;
152+
try {
153+
TunnelMessage.OpenDataConn parsed = TunnelMessage.OpenDataConn.parseFrom(bodyBytes);
154+
155+
} catch (Exception ignore) {
156+
}
157+
return result;
158+
}
159+
});
134160
}
135161

136162
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, long minDelay, long maxDelay, long heartbeatDelay, String secret, String name) {

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java

Lines changed: 147 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414
import top.meethigher.proxy.tcp.tunnel.handler.TunnelHandler;
1515
import top.meethigher.proxy.tcp.tunnel.proto.TunnelMessage;
1616

17-
import java.util.HashSet;
18-
import java.util.Set;
17+
import java.util.LinkedHashMap;
18+
import java.util.Map;
19+
import java.util.concurrent.CountDownLatch;
1920
import java.util.concurrent.ThreadLocalRandom;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2022

2123
/**
24+
*
25+
*
2226
* <p>背景:</p><p>我近期买了个树莓派,但是又不想随身带着树莓派,因此希望可以公网访问。</p>
2327
* <p>
2428
* 但是使用<a href="https://github.com/fatedier/frp">fatedier/frp</a>的过程中,不管在Windows还是Linux,都被扫出病毒了。
@@ -33,12 +37,12 @@ public class ReverseTcpProxyTunnelServer extends TunnelServer {
3337

3438
private static final Logger log = LoggerFactory.getLogger(ReverseTcpProxyTunnelServer.class);
3539
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
36-
protected static final String SECRECT_TOKEN = "123456789";
40+
protected static final String SECRET_DEFAULT = "0123456789";
3741

3842

3943
protected String host = "0.0.0.0";
4044
protected int port = 44444;
41-
protected Set<NetSocket> authedSockets = new HashSet<>(); // 授权成功的socket列表
45+
protected Map<NetSocket, DataProxyServer> authedSockets = new LinkedHashMap<>();// 授权成功的socket列表
4246

4347
protected final String secret;
4448
protected final String name;
@@ -53,7 +57,10 @@ public ReverseTcpProxyTunnelServer(Vertx vertx, NetServer netServer, String secr
5357
socket.handler(decode(socket));
5458
socket.closeHandler(v -> {
5559
log.debug("closed {} -- {}", socket.remoteAddress(), socket.localAddress());
56-
authedSockets.remove(socket);
60+
DataProxyServer removed = authedSockets.remove(socket);
61+
if (removed != null) {
62+
removed.stop();
63+
}
5764
});
5865
TunnelHandler connectedHandler = tunnelHandlers.get(null);
5966
if (connectedHandler != null) {
@@ -71,6 +78,23 @@ protected void addMessageHandler() {
7178
// 监听连接成功事件
7279
this.onConnected((vertx1, netSocket, buffer) -> log.debug("{} connected", netSocket.remoteAddress()));
7380

81+
// 监听心跳事件
82+
this.on(TunnelMessageType.HEARTBEAT, new AbstractTunnelHandler() {
83+
@Override
84+
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
85+
if (authedSockets.containsKey(netSocket)) {
86+
netSocket.write(encode(TunnelMessageType.HEARTBEAT_ACK,
87+
TunnelMessage.HeartbeatAck.newBuilder().setTimestamp(System.currentTimeMillis())
88+
.buildPartial().toByteArray()));
89+
return true;
90+
} else {
91+
// 未经过授权的心跳,直接拒绝
92+
netSocket.close();
93+
return false;
94+
}
95+
}
96+
});
97+
7498
// 监听授权与开通端口事件
7599
this.on(TunnelMessageType.OPEN_DATA_PORT, new AbstractTunnelHandler() {
76100
@Override
@@ -79,16 +103,30 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
79103
boolean result = false;
80104
try {
81105
TunnelMessage.OpenDataPort parsed = TunnelMessage.OpenDataPort.parseFrom(bodyBytes);
106+
TunnelMessage.OpenDataPortAck.Builder builder = TunnelMessage.OpenDataPortAck
107+
.newBuilder();
82108
if (secret.equals(parsed.getSecret())) {
83-
final int tPort = parsed.getPort();
84-
109+
final DataProxyServer dataProxyServer = new DataProxyServer(vertx, parsed.getDataProxyName(), parsed.getDataProxyPort());
110+
if (dataProxyServer.startSync()) {
111+
result = true;
112+
builder.setSuccess(result).setMessage("success");
113+
netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT_ACK,
114+
builder.build().toByteArray()));
115+
authedSockets.put(netSocket, dataProxyServer);
116+
} else {
117+
builder.setSuccess(result).setMessage("fail to open data port " + parsed.getDataProxyPort());
118+
netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT_ACK,
119+
builder.build().toByteArray())).onComplete(ar -> netSocket.close());
120+
}
121+
85122
} else {
123+
TunnelMessage.OpenDataPortAck ack = TunnelMessage.OpenDataPortAck
124+
.newBuilder()
125+
.setSuccess(result)
126+
.setMessage("your secret is incorrect!")
127+
.build();
86128
netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT_ACK,
87-
TunnelMessage.OpenDataPortAck
88-
.newBuilder()
89-
.setSuccess(result)
90-
.setMessage("your secret is incorrect!")
91-
.build().toByteArray())).onComplete(ar -> netSocket.close());
129+
ack.toByteArray())).onComplete(ar -> netSocket.close());
92130
}
93131
} catch (Exception e) {
94132
}
@@ -106,11 +144,11 @@ public static ReverseTcpProxyTunnelServer create(Vertx vertx, NetServer netServe
106144
}
107145

108146
public static ReverseTcpProxyTunnelServer create(Vertx vertx, NetServer netServer) {
109-
return new ReverseTcpProxyTunnelServer(vertx, netServer, SECRECT_TOKEN, generateName());
147+
return new ReverseTcpProxyTunnelServer(vertx, netServer, SECRET_DEFAULT, generateName());
110148
}
111149

112150
public static ReverseTcpProxyTunnelServer create(Vertx vertx) {
113-
return new ReverseTcpProxyTunnelServer(vertx, vertx.createNetServer(), SECRECT_TOKEN, generateName());
151+
return new ReverseTcpProxyTunnelServer(vertx, vertx.createNetServer(), SECRET_DEFAULT, generateName());
114152
}
115153

116154

@@ -152,4 +190,99 @@ public void stop() {
152190
.onFailure(e -> log.error("{} close failed", name, e));
153191
}
154192

193+
194+
protected static class DataProxyServer {
195+
196+
protected final Vertx vertx;
197+
protected final NetServer netServer;
198+
protected final String name;
199+
protected final Handler<NetSocket> connectHandler;
200+
201+
protected final String host;
202+
protected final int port;
203+
204+
public DataProxyServer(Vertx vertx, String name,
205+
String host, int port) {
206+
this.vertx = vertx;
207+
this.name = name;
208+
this.host = host;
209+
this.port = port;
210+
this.netServer = this.vertx.createNetServer();
211+
this.connectHandler = socket -> {
212+
};
213+
}
214+
215+
public DataProxyServer(Vertx vertx, String name,
216+
int port) {
217+
this(vertx, name, "0.0.0.0", port);
218+
}
219+
220+
public void start() {
221+
this.netServer
222+
.connectHandler(this.connectHandler)
223+
.listen(port, host)
224+
.onComplete(ar -> {
225+
if (ar.succeeded()) {
226+
log.info("{} started on {}:{}", name, host, port);
227+
} else {
228+
Throwable e = ar.cause();
229+
log.error("{} start failed", name, e);
230+
}
231+
});
232+
}
233+
234+
public void stop() {
235+
this.netServer.close()
236+
.onSuccess(v -> log.info("{} closed", name))
237+
.onFailure(e -> log.error("{} close failed", name, e));
238+
}
239+
240+
public boolean startSync() {
241+
CountDownLatch latch = new CountDownLatch(1);
242+
AtomicBoolean success = new AtomicBoolean(false);
243+
this.netServer
244+
.connectHandler(this.connectHandler)
245+
.listen(port, host)
246+
.onComplete(ar -> {
247+
if (ar.succeeded()) {
248+
success.set(true);
249+
log.info("{} started on {}:{}", name, host, port);
250+
} else {
251+
Throwable e = ar.cause();
252+
log.error("{} start failed", name, e);
253+
}
254+
latch.countDown();
255+
});
256+
257+
try {
258+
latch.await();
259+
} catch (Exception ignore) {
260+
261+
}
262+
return success.get();
263+
}
264+
265+
public boolean stopSync() {
266+
CountDownLatch latch = new CountDownLatch(1);
267+
AtomicBoolean success = new AtomicBoolean(false);
268+
this.netServer.close()
269+
.onComplete(ar -> {
270+
if (ar.succeeded()) {
271+
success.set(true);
272+
log.info("{} closed", name);
273+
} else {
274+
log.error("{} close failed", name, ar.cause());
275+
}
276+
latch.countDown();
277+
});
278+
try {
279+
latch.await();
280+
} catch (Exception ignore) {
281+
282+
}
283+
return success.get();
284+
}
285+
286+
}
287+
155288
}

src/main/java/top/meethigher/proxy/tcp/tunnel/TunnelDataServer.java

Lines changed: 0 additions & 4 deletions
This file was deleted.

src/main/java/top/meethigher/proxy/tcp/tunnel/handler/AbstractTunnelHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ public void handle(Vertx vertx, NetSocket netSocket, Buffer buffer) {
2525
vertx.executeBlocking((Callable<Void>) () -> {
2626
TunnelMessageCodec.DecodedMessage decodedMessage = TunnelMessageCodec.decode(buffer);
2727
TunnelMessageType type = TunnelMessageType.fromCode(decodedMessage.type);
28+
log.debug("received message type = {}, doHandle ...", type);
2829
boolean result = doHandle(vertx, netSocket, type, decodedMessage.body);
29-
log.debug("received message type = {}, handle result = {}", type, result);
30+
log.debug("received message type = {}, doHandle result = {}", type, result);
3031
return null;
3132
});
3233
}

0 commit comments

Comments
 (0)