Skip to content

Commit 3aa2f74

Browse files
committed
feat: 1. 完成muxserver的处理逻辑 2. 进一步封装muxclient与muxserver并进行单元测试
1 parent 2e5f45f commit 3aa2f74

File tree

7 files changed

+162
-24
lines changed

7 files changed

+162
-24
lines changed

src/main/java/top/meethigher/proxy/tcp/mux/Mux.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public abstract class Mux {
2727
/**
2828
* 默认对称加密密钥
2929
*/
30-
public static final String SECRET_DEFAULT = "1234567890abcdef";
30+
public static final String SECRET_DEFAULT = "hello,meethigher";
3131

3232
public static final short type = 5209;
3333

src/main/java/top/meethigher/proxy/tcp/mux/ReverseTcpProxyMuxClient.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import top.meethigher.proxy.tcp.mux.model.MuxNetAddress;
1313

1414
import java.util.ArrayList;
15-
import java.util.HashMap;
1615
import java.util.List;
1716
import java.util.Map;
1817
import java.util.concurrent.ThreadLocalRandom;
@@ -40,18 +39,18 @@ public class ReverseTcpProxyMuxClient extends Mux {
4039

4140
protected final NetClient netClient;
4241

43-
protected final NetAddress muxServer;
42+
protected final NetAddress muxServerAddress;
4443

4544
protected final String name;
4645

4746
protected final List<NetServer> netServers = new ArrayList<>();
4847

49-
public ReverseTcpProxyMuxClient(Vertx vertx, String secret, Map<MuxNetAddress, NetAddress> mapper, NetServerOptions netServerOptions, NetClient netClient, NetAddress muxServer, String name) {
48+
public ReverseTcpProxyMuxClient(Vertx vertx, String secret, Map<MuxNetAddress, NetAddress> mapper, NetServerOptions netServerOptions, NetClient netClient, NetAddress muxServerAddress, String name) {
5049
super(vertx, secret);
5150
this.mapper = mapper;
5251
this.netServerOptions = netServerOptions;
5352
this.netClient = netClient;
54-
this.muxServer = muxServer;
53+
this.muxServerAddress = muxServerAddress;
5554
this.name = name;
5655
}
5756

@@ -61,9 +60,9 @@ protected void handleConnect(NetSocket src, MuxNetAddress localServer, NetAddres
6160
log.debug("{}: source {} -- {} connected", localServer.getName(), src.localAddress(), src.remoteAddress());
6261
src.exceptionHandler(e -> log.error("{}: source {} -- {} exception occurred", localServer.getName(), src.localAddress(), src.remoteAddress(), e))
6362
.closeHandler(v -> log.debug("{}: source {} -- {} closed", localServer.getName(), src.localAddress(), src.remoteAddress()));
64-
netClient.connect(muxServer.getPort(), muxServer.getHost())
63+
netClient.connect(muxServerAddress.getPort(), muxServerAddress.getHost())
6564
.onFailure(e -> {
66-
log.error("{}: failed to connect to {}", localServer.getName(), muxServer, e);
65+
log.error("{}: failed to connect to {}", localServer.getName(), muxServerAddress, e);
6766
src.close();
6867
})
6968
.onSuccess(dst -> {
@@ -114,7 +113,7 @@ public void start() {
114113
name,
115114
local.getName(),
116115
local,
117-
muxServer,
116+
muxServerAddress,
118117
mapper.get(local)
119118
);
120119
netServers.add(v);
@@ -150,19 +149,20 @@ public static String generateName() {
150149
}
151150
}
152151

153-
public static ReverseTcpProxyMuxClient create() {
154-
Vertx vertx = Vertx.vertx();
155-
return new ReverseTcpProxyMuxClient(vertx, Mux.SECRET_DEFAULT,
156-
new HashMap<MuxNetAddress, NetAddress>() {{
157-
put(new MuxNetAddress("0.0.0.0", 6666, "ssh1"),
158-
new NetAddress("127.0.0.1", 22));
159-
put(new MuxNetAddress("0.0.0.0", 6667, "ssh2"),
160-
new NetAddress("127.0.0.1", 22));
161-
162-
}}, new NetServerOptions(), vertx.createNetClient(),
163-
new NetAddress("10.0.0.30", 22),
164-
generateName());
152+
public static ReverseTcpProxyMuxClient create(Vertx vertx, String secret, Map<MuxNetAddress, NetAddress> mapper, NetServerOptions netServerOptions, NetClient netClient, NetAddress muxServerAddress, String name) {
153+
return new ReverseTcpProxyMuxClient(vertx, secret, mapper, netServerOptions, netClient, muxServerAddress, name);
165154
}
166155

156+
public static ReverseTcpProxyMuxClient create(Vertx vertx, String secret, Map<MuxNetAddress, NetAddress> mapper, NetServerOptions netServerOptions, NetClient netClient, NetAddress muxServerAddress) {
157+
return create(vertx, secret, mapper, netServerOptions, netClient, muxServerAddress, generateName());
158+
}
159+
160+
public static ReverseTcpProxyMuxClient create(Vertx vertx, String secret, Map<MuxNetAddress, NetAddress> mapper, NetAddress muxServerAddress) {
161+
return create(vertx, secret, mapper, new NetServerOptions(), vertx.createNetClient(), muxServerAddress, generateName());
162+
}
163+
164+
public static ReverseTcpProxyMuxClient create(Vertx vertx, Map<MuxNetAddress, NetAddress> mapper, NetAddress muxServerAddress) {
165+
return create(vertx, SECRET_DEFAULT, mapper, new NetServerOptions(), vertx.createNetClient(), muxServerAddress, generateName());
166+
}
167167

168168
}

src/main/java/top/meethigher/proxy/tcp/mux/ReverseTcpProxyMuxServer.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ protected ReverseTcpProxyMuxServer(Vertx vertx, String secret, NetServer netServ
3939
protected void handleConnect(NetSocket src) {
4040
src.pause();
4141
log.debug("source {} -- {} connected", src.localAddress(), src.remoteAddress());
42+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
43+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
4244
src.exceptionHandler(e -> log.error("source {} -- {} exception occurred", src.localAddress(), src.remoteAddress(), e))
4345
.closeHandler(v -> log.debug("source {} -- {} closed", src.localAddress(), src.remoteAddress()));
4446
src.handler(new MuxMessageParser(muxMsg -> this.bindMuxConnections(src, muxMsg), src));
@@ -65,8 +67,36 @@ protected void bindMuxConnections(NetSocket src, MuxMessageParser.MuxMessage mux
6567
})
6668
.onSuccess(dst -> {
6769
dst.pause();
68-
log.debug("target {} -- {} connected",dst.localAddress(),dst.remoteAddress());
69-
70+
log.debug("target {} -- {} connected", dst.localAddress(), dst.remoteAddress());
71+
// 由于内部都是使用pipe来进行数据传输,所以exceptionHandler肯定是都重新注册过了,参考{@code io.vertx.core.streams.impl.PipeImpl.PipeImpl }
72+
// 但如果还没进入pipe前,连接出现异常,那么就会触发此处的exceptionHandler。https://github.com/meethigher/tcp-reverse-proxy/issues/18
73+
dst.exceptionHandler(e -> log.error("target {} -- {} exception occurred", dst.localAddress(), dst.remoteAddress(), e))
74+
.closeHandler(v -> log.debug("target {} -- {} closed", dst.localAddress(), dst.remoteAddress()));
75+
/**
76+
* 不能使用write的成功与否判断链路是否正常,但是可以通过write.onSuccess保证顺序写入。
77+
* 测试中发现,即便链路异常,返回仍然是true 参考 https://github.com/meethigher/bug-test/blob/vertx-network-disconnect/src/main/java/top/meethigher/BugTest.java
78+
* write是把数据复制到缓冲区,缓冲区有空间一般就不会失败
79+
* 本地测试示例 top.meethigher.proxy.tcp.NetSocketWriteFailureTest
80+
*/
81+
dst.write(muxMsg.payload)
82+
.onSuccess(t -> {
83+
// https://github.com/meethigher/tcp-reverse-proxy/issues/12
84+
// 将日志记录详细,便于排查问题
85+
src.pipeTo(dst)
86+
.onSuccess(v -> log.debug("source {} -- {} pipe to target {} -- {} succeeded",
87+
src.localAddress(), src.remoteAddress(), dst.localAddress(), dst.remoteAddress()))
88+
.onFailure(e -> log.error("source {} -- {} pipe to target {} -- {} failed",
89+
src.localAddress(), src.remoteAddress(), dst.localAddress(), dst.remoteAddress(), e));
90+
dst.pipeTo(src)
91+
.onSuccess(v -> log.debug("target {} -- {} pipe to source {} -- {} succeeded",
92+
dst.localAddress(), dst.remoteAddress(), src.localAddress(), src.remoteAddress()))
93+
.onFailure(e -> log.error("target {} -- {} pipe to source {} -- {} failed",
94+
dst.localAddress(), dst.remoteAddress(), src.localAddress(), src.remoteAddress(), e));
95+
log.debug("source {} -- {} bound to target {} -- {}", src.localAddress(), src.remoteAddress(),
96+
dst.localAddress(), dst.remoteAddress());
97+
src.resume();
98+
dst.resume();
99+
});
70100
});
71101

72102
}
@@ -90,6 +120,16 @@ public static String generateName() {
90120
}
91121
}
92122

123+
public ReverseTcpProxyMuxServer host(String host) {
124+
this.host = host;
125+
return this;
126+
}
127+
128+
public ReverseTcpProxyMuxServer port(int port) {
129+
this.port = port;
130+
return this;
131+
}
132+
93133
public void start() {
94134
netServer.connectHandler(this::handleConnect)
95135
.exceptionHandler(e -> log.error("{} socket errors happening before the connection is passed to the connectHandler", name, e))
@@ -105,5 +145,21 @@ public void stop() {
105145

106146
}
107147

148+
public static ReverseTcpProxyMuxServer create(Vertx vertx, String secret, NetServer netServer, NetClient netClient, String name) {
149+
return new ReverseTcpProxyMuxServer(vertx, secret, netServer, netClient, name);
150+
}
151+
152+
public static ReverseTcpProxyMuxServer create(Vertx vertx, String secret, NetServer netServer, NetClient netClient) {
153+
return create(vertx, secret, netServer, netClient, generateName());
154+
}
155+
156+
public static ReverseTcpProxyMuxServer create(Vertx vertx, String secret) {
157+
return create(vertx, secret, vertx.createNetServer(), vertx.createNetClient(), generateName());
158+
}
159+
160+
public static ReverseTcpProxyMuxServer create(Vertx vertx) {
161+
return create(vertx, SECRET_DEFAULT, vertx.createNetServer(), vertx.createNetClient(), generateName());
162+
}
163+
108164

109165
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package top.meethigher.proxy.tcp;
2+
3+
import io.vertx.core.Vertx;
4+
import org.junit.Test;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.UUID;
9+
import java.util.concurrent.locks.LockSupport;
10+
11+
public class NetSocketWriteFailureTest {
12+
13+
private static final Logger log = LoggerFactory.getLogger(NetSocketWriteFailureTest.class);
14+
15+
@Test
16+
public void step1() {
17+
Vertx vertx = Vertx.vertx();
18+
vertx.createNetServer()
19+
.connectHandler(src -> {
20+
vertx.setTimer(3000, id -> {
21+
/**
22+
* 不能使用write的成功与否判断链路是否正常,但是可以通过write.onSuccess保证顺序写入。
23+
* 测试中发现,即便链路异常,返回仍然是true
24+
*
25+
* write是把数据复制到缓冲区,缓冲区有空间一般就不会失败
26+
*/
27+
src.write(UUID.randomUUID().toString()).onComplete(ar -> {
28+
log.info("server -> client write result: {}", ar.succeeded());
29+
});
30+
});
31+
})
32+
.exceptionHandler(e -> {
33+
log.error("socket errors happening before the connection is passed to the connectHandler", e);
34+
})
35+
.listen(8080).onFailure(e -> {
36+
System.exit(1);
37+
});
38+
39+
40+
LockSupport.park();
41+
42+
}
43+
44+
@Test
45+
public void step2() {
46+
Vertx vertx = Vertx.vertx();
47+
vertx.createNetClient().connect(8080, "127.0.0.1").onSuccess(src -> {
48+
System.exit(1);
49+
});
50+
51+
LockSupport.park();
52+
}
53+
}

src/test/java/top/meethigher/proxy/tcp/mux/MuxTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,6 @@ public void name() {
2626
Buffer encode = mux.aesBase64Encode(netAddress1);
2727

2828
NetAddress decode = mux.aesBase64Decode(encode);
29-
System.out.println();
29+
System.out.println(decode);
3030
}
3131
}

src/test/java/top/meethigher/proxy/tcp/mux/ReverseTcpProxyMuxClientTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
package top.meethigher.proxy.tcp.mux;
22

3+
import io.vertx.core.Vertx;
34
import org.junit.Test;
5+
import top.meethigher.proxy.NetAddress;
6+
import top.meethigher.proxy.tcp.mux.model.MuxNetAddress;
47

8+
import java.util.LinkedHashMap;
9+
import java.util.Map;
510
import java.util.concurrent.TimeUnit;
611
import java.util.concurrent.locks.LockSupport;
712

813
public class ReverseTcpProxyMuxClientTest {
914
@Test
1015
public void name() throws Exception {
11-
ReverseTcpProxyMuxClient client = ReverseTcpProxyMuxClient.create();
16+
Vertx vertx = Vertx.vertx();
17+
Map<MuxNetAddress, NetAddress> mapper = new LinkedHashMap<>();
18+
mapper.put(new MuxNetAddress("0.0.0.0", 6666, "ssh20"), new NetAddress("10.0.0.20", 22));
19+
mapper.put(new MuxNetAddress("0.0.0.0", 6667, "ssh30"), new NetAddress("10.0.0.30", 22));
20+
mapper.put(new MuxNetAddress("0.0.0.0", 6668, "http"), new NetAddress("reqres.in", 443));
21+
NetAddress muxServerAddress = new NetAddress("127.0.0.1", 8080);
22+
ReverseTcpProxyMuxClient client = ReverseTcpProxyMuxClient.create(vertx, mapper, muxServerAddress);
1223
client.start();
1324

1425
TimeUnit.HOURS.sleep(2);
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package top.meethigher.proxy.tcp.mux;
2+
3+
import io.vertx.core.Vertx;
4+
import org.junit.Test;
5+
6+
import java.util.concurrent.locks.LockSupport;
7+
8+
public class ReverseTcpProxyMuxServerTest {
9+
10+
@Test
11+
public void name() {
12+
ReverseTcpProxyMuxServer.create(Vertx.vertx())
13+
.port(8080)
14+
.start();
15+
16+
LockSupport.park();
17+
}
18+
}

0 commit comments

Comments
 (0)