Skip to content

Commit a814395

Browse files
committed
fix: 解决导致issue-7的解码问题
1 parent e3b06c7 commit a814395

File tree

6 files changed

+214
-49
lines changed

6 files changed

+214
-49
lines changed

src/main/java/top/meethigher/proxy/tcp/tunnel/codec/TunnelMessageParser.java

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -63,36 +63,44 @@ public void handle(Buffer buffer) {
6363
if (buf.length() < lengthFieldLength) {
6464
return;
6565
} else {
66-
int totalLength = buf.getInt(lengthFieldOffset);
67-
// 校验最大长度
68-
if (totalLength > maxLength) {
69-
log.warn("too many bytes in length field, {} > {}, connection {} will be closed",
70-
totalLength, maxLength,
71-
netSocket.remoteAddress());
66+
parse();
67+
}
68+
69+
}
70+
71+
private void parse() {
72+
int totalLength = buf.getInt(lengthFieldOffset);
73+
// 校验最大长度
74+
if (totalLength > maxLength) {
75+
log.warn("too many bytes in length field, {} > {}, connection {} will be closed",
76+
totalLength, maxLength,
77+
netSocket.remoteAddress());
78+
netSocket.close();
79+
return;
80+
}
81+
// 校验类型编码是否在预设范围内
82+
if (totalLength >= (lengthFieldLength + typeFieldLength)) {
83+
short code = buf.getShort(lengthFieldLength);
84+
try {
85+
TunnelMessageType.fromCode(code);
86+
} catch (Exception e) {
87+
log.error("invalid type, connection {} will be closed", netSocket.remoteAddress(), e);
7288
netSocket.close();
7389
return;
7490
}
75-
// 校验类型编码是否在预设范围内
76-
if (totalLength >= (lengthFieldLength + typeFieldLength)) {
77-
short code = buf.getShort(lengthFieldLength);
78-
try {
79-
TunnelMessageType.fromCode(code);
80-
} catch (Exception e) {
81-
log.error("invalid type, connection {} will be closed", netSocket.remoteAddress(), e);
82-
netSocket.close();
83-
return;
84-
}
85-
}
91+
}
8692

87-
// 校验是否达到预设总长度
88-
if (buf.length() < totalLength) {
89-
return;
90-
} else {
91-
outputHandler.handle(buf.getBuffer(0, totalLength));
92-
buf = buf.getBuffer(totalLength, buf.length());
93-
return;
93+
// 校验是否达到预设总长度
94+
if (buf.length() < totalLength) {
95+
return;
96+
} else {
97+
outputHandler.handle(buf.getBuffer(0, totalLength));
98+
buf = buf.getBuffer(totalLength, buf.length());
99+
// 缓冲区未清空时,要将数据进一步解析。https://github.com/meethigher/tcp-reverse-proxy/issues/7
100+
if (buf.length() > 0) {
101+
parse();
94102
}
103+
return;
95104
}
96-
97105
}
98106
}

src/test/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClientTest.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,22 +131,14 @@ public void client() {
131131
Vertx vertx = Vertx.vertx(new VertxOptions().setAddressResolverOptions(
132132
new AddressResolverOptions().setQueryTimeout(2000)
133133
));
134-
// http内网穿透
135-
ReverseTcpProxyTunnelClient.create(ReverseTcpProxyTunnelClientTest.vertx, vertx.createNetClient())
136-
.backendHost("reqres.in")
137-
.backendPort(80)
138-
.dataProxyName("http-proxy")
139-
.dataProxyHost("127.0.0.1")
140-
.dataProxyPort(80)
141-
.connect("127.0.0.1", 44444);
142134
// ssh内网穿透
143-
ReverseTcpProxyTunnelClient.create(ReverseTcpProxyTunnelClientTest.vertx, vertx.createNetClient())
144-
.backendHost("meethigher.top")
135+
ReverseTcpProxyTunnelClient.create(ReverseTcpProxyTunnelClientTest.vertx, vertx.createNetClient(), "helloworld")
136+
.backendHost("10.0.0.9")
145137
.backendPort(22)
146138
.dataProxyName("ssh-proxy")
147-
.dataProxyHost("127.0.0.1")
148-
.dataProxyPort(22)
149-
.connect("127.0.0.1", 44444);
139+
.dataProxyHost("10.0.0.1")
140+
.dataProxyPort(2222)
141+
.connect("10.0.0.1", 44444);
150142
LockSupport.park();
151143
}
152144
}
Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,24 @@
11
package top.meethigher.proxy.tcp.tunnel;
22

33
import io.vertx.core.Vertx;
4-
import io.vertx.core.VertxOptions;
5-
import io.vertx.core.net.NetServer;
64
import io.vertx.core.net.NetServerOptions;
75
import org.junit.Test;
86

9-
import java.time.Duration;
10-
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.ConcurrentHashMap;
118
import java.util.concurrent.locks.LockSupport;
129

1310
public class ReverseTcpProxyTunnelServerTest {
1411

1512

1613
@Test
1714
public void server() {
18-
Vertx vertx = Vertx.vertx(new VertxOptions().setMaxEventLoopExecuteTime(Duration.ofDays(1).toNanos()));
19-
// 设置空闲超时,注意该超时参数,应该大于客户端的心跳时间
20-
NetServer netServer = vertx.createNetServer(new NetServerOptions().setIdleTimeout(10).setIdleTimeoutUnit(TimeUnit.SECONDS));
21-
ReverseTcpProxyTunnelServer.create(vertx, netServer)
22-
.judgeDelay(500)
23-
.heartbeatDelay(9000)
24-
.start();
15+
Vertx vertx = Vertx.vertx();
16+
ReverseTcpProxyTunnelServer server = ReverseTcpProxyTunnelServer.create(vertx, vertx.createNetServer(
17+
new NetServerOptions().setTcpNoDelay(true)
18+
), "helloworld", new ConcurrentHashMap<>())
19+
.port(44444)
20+
.judgeDelay(300);
21+
server.start();
2522
LockSupport.park();
2623
}
2724
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package top.meethigher.proxy.tcp.tunnel;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.net.NetClient;
5+
import io.vertx.core.net.NetSocket;
6+
import org.junit.Test;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.concurrent.locks.LockSupport;
12+
13+
/**
14+
* 该内容参考https://github.com/meethigher/tcp-reverse-proxy/issues/7
15+
*
16+
* @author <a href="https://meethigher.top">chenchuancheng</a>
17+
* @since 2025/05/11 13:01
18+
*/
19+
public class TunnelTest {
20+
21+
private static final Logger log = LoggerFactory.getLogger(TunnelTest.class);
22+
23+
@Test
24+
public void t() throws Exception {
25+
/**
26+
* 内网穿透实际大批量使用时,未配对的用户连接堆积,导致后续新来的用户连接等待配对的耗时越来越长。
27+
*/
28+
Vertx vertx = Vertx.vertx();
29+
NetClient netClient = vertx.createNetClient();
30+
int initialValue = 100;
31+
int batchReq = 10;
32+
33+
AtomicInteger ai = new AtomicInteger(1);
34+
35+
for (int i = 0; i < initialValue; i++) {
36+
final String id = ai.getAndIncrement() + "";
37+
netClient.connect(2222, "10.0.0.1").onComplete(ar -> {
38+
if (ar.succeeded()) {
39+
NetSocket socket = ar.result();
40+
long start = System.currentTimeMillis();
41+
socket.pause();
42+
socket.handler(buf -> {
43+
log.info("{} consumed {} ms, response: {}", id, System.currentTimeMillis() - start, buf.toString());
44+
});
45+
socket.closeHandler(t -> {
46+
log.info("{} consumed {} ms, closed", id, System.currentTimeMillis() - start);
47+
});
48+
socket.resume();
49+
} else {
50+
log.error("{}", id, ar.cause());
51+
}
52+
});
53+
}
54+
55+
LockSupport.park();
56+
}
57+
58+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package top.meethigher.proxy.tcp.tunnel.codec;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.VertxOptions;
5+
import io.vertx.core.net.NetSocket;
6+
import org.junit.Test;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import top.meethigher.proxy.tcp.tunnel.TunnelClient;
10+
import top.meethigher.proxy.tcp.tunnel.handler.AbstractTunnelHandler;
11+
12+
import java.nio.charset.StandardCharsets;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.locks.LockSupport;
15+
16+
/**
17+
* 用于定位问题https://github.com/meethigher/tcp-reverse-proxy/issues/7
18+
*
19+
* @author <a href="https://meethigher.top">chenchuancheng</a>
20+
* @since 2025/05/11 17:22
21+
*/
22+
public class ClientCodecTest {
23+
24+
private static final Logger log = LoggerFactory.getLogger(ClientCodecTest.class);
25+
26+
@Test
27+
public void client() {
28+
Vertx vertx = Vertx.vertx(new VertxOptions().setMaxWorkerExecuteTimeUnit(TimeUnit.DAYS));
29+
TunnelClient tunnelClient = new TunnelClient(vertx, vertx.createNetClient(), 1000, 64000) {
30+
31+
};
32+
tunnelClient.connect("127.0.0.1", 8080);
33+
tunnelClient.on(TunnelMessageType.OPEN_DATA_PORT, new AbstractTunnelHandler() {
34+
@Override
35+
protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType type, byte[] bodyBytes) {
36+
log.info("<--open_data_port: {}", new String(bodyBytes, StandardCharsets.UTF_8));
37+
return false;
38+
}
39+
});
40+
41+
LockSupport.park();
42+
}
43+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package top.meethigher.proxy.tcp.tunnel.codec;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.buffer.Buffer;
5+
import io.vertx.core.net.NetServer;
6+
import io.vertx.core.net.NetSocket;
7+
import org.junit.Test;
8+
import top.meethigher.proxy.tcp.tunnel.proto.TunnelMessage;
9+
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.locks.LockSupport;
14+
15+
/**
16+
* 用于定位问题https://github.com/meethigher/tcp-reverse-proxy/issues/7
17+
*
18+
* @author <a href="https://meethigher.top">chenchuancheng</a>
19+
* @since 2025/05/11 17:22
20+
*/
21+
public class ServerCodecTest {
22+
23+
@Test
24+
public void server() {
25+
26+
ExecutorService executorService = Executors.newFixedThreadPool(10);
27+
28+
Vertx vertx = Vertx.vertx();
29+
NetServer netServer = vertx.createNetServer();
30+
netServer.connectHandler(socket -> {
31+
socket.pause();
32+
for (int i = 0; i < 100; i++) {
33+
executorService.execute(new MultiThreadWrite(socket, vertx));
34+
}
35+
socket.resume();
36+
}).listen(8080).onFailure(e -> {
37+
e.printStackTrace();
38+
System.exit(1);
39+
});
40+
LockSupport.park();
41+
}
42+
43+
public static class MultiThreadWrite implements Runnable {
44+
45+
private static final AtomicInteger ai = new AtomicInteger(0);
46+
47+
private final NetSocket socket;
48+
49+
private final Vertx vertx;
50+
51+
public MultiThreadWrite(NetSocket socket, Vertx vertx) {
52+
this.socket = socket;
53+
this.vertx = vertx;
54+
}
55+
56+
@Override
57+
public void run() {
58+
final int i = ai.incrementAndGet();
59+
TunnelMessage.OpenDataPort test = TunnelMessage.OpenDataPort.newBuilder()
60+
.setSecret(i + "-hello world").build();
61+
Buffer buffer = TunnelMessageCodec.encode(TunnelMessageType.OPEN_DATA_PORT.code(), test.toByteArray());
62+
socket.write(buffer).onComplete(ar -> {
63+
System.out.println(i + "-->open data port, result:" + ar.succeeded());
64+
});
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)