Skip to content

Commit 05b055c

Browse files
authored
Merge pull request #10 from meethigher/issue-9
数据连接传到后端连接的前8个字节为标识位。解决TunnelClient代码逻辑上丢失后面字节的bug
2 parents 43c2f1a + 629f5b7 commit 05b055c

File tree

7 files changed

+229
-43
lines changed

7 files changed

+229
-43
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>top.meethigher</groupId>
88
<artifactId>tcp-reverse-proxy</artifactId>
9-
<version>1.0.5</version>
9+
<version>1.0.6</version>
1010

1111
<properties>
1212
<maven.compiler.source>8</maven.compiler.source>

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
214214
if (buf.length() < 8) {
215215
return;
216216
}
217+
// note: 前8个字节是tunnel通信使用的。
217218
if (buf.getByte(0) == Tunnel.DATA_CONN_FLAG[0]
218219
&& buf.getByte(1) == Tunnel.DATA_CONN_FLAG[1]
219220
&& buf.getByte(2) == Tunnel.DATA_CONN_FLAG[2]
@@ -232,30 +233,46 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
232233
})
233234
.onSuccess(backendSocket -> {
234235
backendSocket.pause();
236+
// 若实际数据传输的长度大于8字节,那么后面的字节需要发出去。
237+
// https://github.com/meethigher/tcp-reverse-proxy/issues/9
238+
if (buf.length() > 8) {
239+
backendSocket.write(buf.getBuffer(8, buf.length()))
240+
.onSuccess(o -> log.debug("{}: sessionId {}, data connection {} -- {} write to backend connection {} -- {} succeeded",
241+
dataProxyName,
242+
sessionId,
243+
dataSocket.remoteAddress(), dataSocket.localAddress(),
244+
backendSocket.remoteAddress(), backendSocket.localAddress()));
245+
}
235246
log.debug("{}: sessionId {}, backend connection {} -- {} established", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress());
236247
// 双向生命周期绑定、双向数据转发
237248
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
238249
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
239-
dataSocket.closeHandler(v -> {
240-
log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress());
241-
}).pipeTo(backendSocket).onFailure(e -> {
242-
log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed",
243-
dataProxyName,
244-
sessionId,
245-
dataSocket.remoteAddress(), dataSocket.localAddress(),
246-
backendSocket.remoteAddress(), backendSocket.localAddress(),
247-
e);
248-
});
249-
backendSocket.closeHandler(v -> {
250-
log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress());
251-
}).pipeTo(dataSocket).onFailure(e -> {
252-
log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed",
253-
dataProxyName,
254-
sessionId,
255-
backendSocket.remoteAddress(), backendSocket.localAddress(),
256-
dataSocket.remoteAddress(), dataSocket.localAddress(),
257-
e);
258-
});
250+
dataSocket.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress()))
251+
.pipeTo(backendSocket)
252+
.onFailure(e -> log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed",
253+
dataProxyName,
254+
sessionId,
255+
dataSocket.remoteAddress(), dataSocket.localAddress(),
256+
backendSocket.remoteAddress(), backendSocket.localAddress(),
257+
e))
258+
.onSuccess(v -> log.debug("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} succeeded",
259+
dataProxyName,
260+
sessionId,
261+
dataSocket.remoteAddress(), dataSocket.localAddress(),
262+
backendSocket.remoteAddress(), backendSocket.localAddress()));
263+
backendSocket.closeHandler(v -> log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress()))
264+
.pipeTo(dataSocket)
265+
.onFailure(e -> log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed",
266+
dataProxyName,
267+
sessionId,
268+
backendSocket.remoteAddress(), backendSocket.localAddress(),
269+
dataSocket.remoteAddress(), dataSocket.localAddress(),
270+
e))
271+
.onSuccess(v -> log.debug("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} succeeded",
272+
dataProxyName,
273+
sessionId,
274+
backendSocket.remoteAddress(), backendSocket.localAddress(),
275+
dataSocket.remoteAddress(), dataSocket.localAddress()));
259276
backendSocket.resume();
260277
dataSocket.resume();
261278
log.debug("{}: sessionId {}, data connection {} -- {} bound to backend connection {} -- {} for session id {}",

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ protected void handleConnect(NetSocket socket) {
9494
socket.pause();
9595
socket.handler(decode(socket));
9696
socket.closeHandler(v -> {
97-
log.debug("closed {} -- {}", socket.remoteAddress(), socket.localAddress());
97+
log.debug("{} -- {} closed", socket.remoteAddress(), socket.localAddress());
9898
DataProxyServer removed = authedSockets.remove(socket);
9999
if (removed != null) {
100100
removed.stop();
@@ -301,25 +301,29 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in
301301
// 双向生命周期绑定、双向数据转发
302302
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
303303
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
304-
userSocket.closeHandler(v -> {
305-
log.debug("{}: sessionId {}, user connection {} -- {} closed", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress());
306-
}).pipeTo(dataSocket).onFailure(e -> {
307-
log.error("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} failed, connection will be closed",
308-
name,
309-
sessionId,
310-
userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress(), e);
311-
});
312-
dataSocket.closeHandler(v -> {
313-
log.debug("{}: sessionId {}, data connection {} -- {} closed",
314-
name,
315-
sessionId,
316-
dataSocket.remoteAddress(), dataSocket.localAddress());
317-
}).pipeTo(userSocket).onFailure(e -> {
318-
log.error("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} failed, connection will be closed",
319-
name,
320-
sessionId,
321-
dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress(), e);
322-
});
304+
userSocket.closeHandler(v -> log.debug("{}: sessionId {}, user connection {} -- {} closed", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress()))
305+
.pipeTo(dataSocket)
306+
.onFailure(e -> log.error("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} failed",
307+
name,
308+
sessionId,
309+
userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress(), e))
310+
.onSuccess(v -> log.debug("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} succeeded",
311+
name,
312+
sessionId,
313+
userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress()));
314+
dataSocket.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed",
315+
name,
316+
sessionId,
317+
dataSocket.remoteAddress(), dataSocket.localAddress()))
318+
.pipeTo(userSocket)
319+
.onFailure(e -> log.error("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} failed",
320+
name,
321+
sessionId,
322+
dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress(), e))
323+
.onSuccess(v -> log.debug("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} succeeded",
324+
name,
325+
sessionId,
326+
dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress()));
323327
log.debug("{}: sessionId {}, data connection {} -- {} bound to user connection {} -- {} for session id {}",
324328
name,
325329
sessionId,
@@ -331,7 +335,11 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in
331335
.appendBytes(DATA_CONN_FLAG)
332336
.appendInt(sessionId)).onSuccess(v -> {
333337
// 将用户连接中的缓存数据发出。
334-
userConn.buffers.forEach(dataSocket::write);
338+
userConn.buffers.forEach(b -> dataSocket.write(b)
339+
.onSuccess(o -> log.debug("{}: sessionId {}, user connection {} -- {} write to data connection {} -- {} succeeded",
340+
name,
341+
sessionId,
342+
userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress())));
335343
});
336344

337345
}
@@ -410,7 +418,7 @@ public boolean stopSync() {
410418
*/
411419
protected void addMessageHandler() {
412420
// 监听连接成功事件
413-
this.onConnected((vertx1, netSocket, buffer) -> log.debug("{} connected", netSocket.remoteAddress()));
421+
this.onConnected((vertx1, netSocket, buffer) -> log.debug("{} -- {} connected", netSocket.remoteAddress(), netSocket.localAddress()));
414422

415423
// 监听心跳事件
416424
this.on(TunnelMessageType.HEARTBEAT, new AbstractTunnelHandler() {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package top.meethigher.proxy.tcp.tunnel;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.http.HttpClient;
5+
import io.vertx.core.http.HttpClientOptions;
6+
import io.vertx.core.http.PoolOptions;
7+
import io.vertx.core.http.RequestOptions;
8+
import org.junit.Assert;
9+
import org.junit.Test;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
import java.util.concurrent.CountDownLatch;
14+
import java.util.concurrent.TimeUnit;
15+
16+
public class Issue9Test {
17+
private static final Logger log = LoggerFactory.getLogger(Issue9Test.class);
18+
19+
@Test
20+
public void test() throws Exception {
21+
22+
/**
23+
* IDEA 多配置“批量启动”——使用 Run/Debug Configuration 的“Compound”功能
24+
* 步骤如下:
25+
* 打开Run/Debug Configurations窗口(快捷键 Ctrl+Alt+R / 右上角 Edit Configurations)。
26+
*
27+
* 新建三个 Application 类型配置,分别设置好 ClassA、ClassB、ClassC 的 main。
28+
*
29+
* 再新建一个Compound类型配置。
30+
*
31+
* 在 Compound 配置的“Run/Debug Configurations”里,添加刚刚那三个配置,顺序拖拽即可调整。
32+
*
33+
* 选中 Compound 配置,点绿色启动按钮,一次性批量顺序启动三(或更多)个实例。
34+
*
35+
* 注意:Compound 是并发同时,不是顺序启动。IDEA 会按照你添加的顺序一个一个起。
36+
*/
37+
38+
// step1: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9SimpleHttpServer.main
39+
40+
// step2: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9TunnelServer.main
41+
42+
// step3: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9TunnelClient.main
43+
44+
Vertx vertx = Vertx.vertx();
45+
HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions()
46+
.setKeepAlive(true)
47+
.setMaxPoolSize(Runtime.getRuntime().availableProcessors()),
48+
new PoolOptions().setHttp1MaxSize(100));
49+
int total = 500;
50+
CountDownLatch latch = new CountDownLatch(total);
51+
for (int i = 0; i < total; i++) {
52+
final String id = String.valueOf(i + 1);
53+
RequestOptions requestOptions = new RequestOptions()
54+
.setAbsoluteURI("http://127.0.0.1:808/api");
55+
httpClient.request(requestOptions).onSuccess(req -> {
56+
req.send().onSuccess(resp -> {
57+
log.info("{} succeeded", id);
58+
latch.countDown();
59+
});
60+
});
61+
}
62+
63+
latch.await(5, TimeUnit.SECONDS);
64+
Assert.assertEquals(total, total - latch.getCount());
65+
66+
}
67+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package top.meethigher.proxy.tcp.tunnel.issue9;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.http.HttpServer;
5+
import io.vertx.core.http.HttpServerResponse;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
public class Issue9SimpleHttpServer {
10+
11+
private static final Logger log = LoggerFactory.getLogger(Issue9SimpleHttpServer.class);
12+
private static final Vertx vertx = Vertx.vertx();
13+
14+
public static void main(String[] args) {
15+
final String result = "<!DOCTYPE html><html><head><meta charset=utf-8><meta name=viewport content=\"width=device-width,initial-scale=1\"><title>novnc2</title><link href=./static/css/app.c90a3de1d5a865cd4149616f9b8040a5.css rel=stylesheet></head><body><div id=app></div><script type=text/javascript src=./static/js/manifest.3ad1d5771e9b13dbdad2.js></script><script type=text/javascript src=./static/js/vendor.eba9acff8b0b3b1b22c4.js></script><script type=text/javascript src=./static/js/app.fef011cae8f5fbff4b55.js></script></body></html>";
16+
HttpServer httpServer = vertx.createHttpServer();
17+
httpServer.requestHandler(req -> {
18+
String address = req.connection().remoteAddress().toString();
19+
log.info("{} connected", address);
20+
req.connection().closeHandler(v -> {
21+
log.info("{} closed", address);
22+
});
23+
HttpServerResponse response = req.response();
24+
response.putHeader("Server", "nginx/1.18.0 (Ubuntu)");
25+
response.putHeader("Date", "Mon, 02 Jun 2025 07:22:44 GMT");
26+
response.putHeader("Content-Type", "text/html");
27+
response.putHeader("Content-Length", "512");
28+
response.putHeader("Last-Modified", "Sun, 04 Apr 2021 03:44:30 GMT");
29+
response.putHeader("ETag", "\"6069361e-200\"");
30+
response.putHeader("Accept-Ranges", "bytes");
31+
response.setStatusCode(200).end(result);
32+
}).listen(80).onFailure(e -> {
33+
log.error("http server start failed", e);
34+
System.exit(1);
35+
});
36+
}
37+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package top.meethigher.proxy.tcp.tunnel.issue9;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.net.NetClient;
5+
import io.vertx.core.net.NetClientOptions;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelClient;
9+
10+
import java.util.concurrent.TimeUnit;
11+
12+
public class Issue9TunnelClient {
13+
14+
private static final Vertx vertx = Vertx.vertx();
15+
private static final Logger log = LoggerFactory.getLogger(Issue9TunnelClient.class);
16+
17+
public static void main(String[] args) {
18+
NetClient netClient = vertx.createNetClient(new NetClientOptions()
19+
.setIdleTimeout(999999999)
20+
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS));
21+
ReverseTcpProxyTunnelClient.create(vertx, netClient)
22+
.dataProxyPort(808)
23+
.dataProxyHost("127.0.0.1")
24+
.dataProxyName("http")
25+
.backendHost("127.0.0.1")
26+
.backendPort(80)
27+
.connect("127.0.0.1", 44444);
28+
}
29+
}

0 commit comments

Comments
 (0)