Skip to content

Commit a07c1d6

Browse files
committed
feat: 1. 日志规整性调整 2. 心跳频率由服务端决定
1 parent ce03010 commit a07c1d6

File tree

7 files changed

+142
-46
lines changed

7 files changed

+142
-46
lines changed

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,15 @@
3838
public class ReverseTcpProxyTunnelClient extends TunnelClient {
3939
private static final Logger log = LoggerFactory.getLogger(ReverseTcpProxyTunnelClient.class);
4040
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
41-
protected static final long HEARTBEAT_DELAY_DEFAULT = 5000;// 毫秒
41+
4242
protected static final long MIN_DELAY_DEFAULT = 1000;// 毫秒
4343
protected static final long MAX_DELAY_DEFAULT = 64000;// 毫秒
4444

4545

46-
protected final long heartbeatDelay;
4746
protected final String secret;
4847
protected final String name;
4948

50-
49+
protected long heartbeatDelay;
5150
protected String backendHost = "meethigher.top";
5251
protected int backendPort = 22;
5352
protected String dataProxyHost = "127.0.0.1";
@@ -75,10 +74,9 @@ protected static String generateName() {
7574
}
7675

7776
protected ReverseTcpProxyTunnelClient(Vertx vertx, NetClient netClient,
78-
long minDelay, long maxDelay, long heartbeatDelay,
77+
long minDelay, long maxDelay,
7978
String secret, String name) {
8079
super(vertx, netClient, minDelay, maxDelay);
81-
this.heartbeatDelay = heartbeatDelay;
8280
this.secret = secret;
8381
this.name = name;
8482
addMessageHandler();
@@ -106,24 +104,25 @@ public ReverseTcpProxyTunnelClient dataProxyPort(int dataProxyPort) {
106104

107105
public ReverseTcpProxyTunnelClient dataProxyName(String dataProxyName) {
108106
this.dataProxyName = dataProxyName;
107+
super.name = this.dataProxyName;
109108
return this;
110109
}
111110

112-
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, long minDelay, long maxDelay, long heartbeatDelay, String secret, String name) {
113-
return new ReverseTcpProxyTunnelClient(vertx, netClient, minDelay, maxDelay, heartbeatDelay, secret, name);
111+
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, long minDelay, long maxDelay, String secret, String name) {
112+
return new ReverseTcpProxyTunnelClient(vertx, netClient, minDelay, maxDelay, secret, name);
114113
}
115114

116115
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient, String secret) {
117-
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, secret, generateName());
116+
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, secret, generateName());
118117
}
119118

120119

121120
public static ReverseTcpProxyTunnelClient create(Vertx vertx, NetClient netClient) {
122-
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, SECRET_DEFAULT, generateName());
121+
return new ReverseTcpProxyTunnelClient(vertx, netClient, MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, SECRET_DEFAULT, generateName());
123122
}
124123

125124
public static ReverseTcpProxyTunnelClient create(Vertx vertx) {
126-
return new ReverseTcpProxyTunnelClient(vertx, vertx.createNetClient(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, HEARTBEAT_DELAY_DEFAULT, SECRET_DEFAULT, generateName());
125+
return new ReverseTcpProxyTunnelClient(vertx, vertx.createNetClient(), MIN_DELAY_DEFAULT, MAX_DELAY_DEFAULT, SECRET_DEFAULT, generateName());
127126
}
128127

129128

@@ -135,6 +134,7 @@ protected void addMessageHandler() {
135134
this.onConnected((vertx, netSocket, buffer) -> netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT,
136135
TunnelMessage.OpenDataPort.newBuilder()
137136
.setSecret(secret)
137+
.setDataProxyHost(dataProxyHost)
138138
.setDataProxyPort(dataProxyPort)
139139
.setDataProxyName(dataProxyName)
140140
.build().toByteArray())));
@@ -149,6 +149,7 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
149149
if (parsed.getSuccess()) {
150150
// 如果认证 + 开通端口成功,那么就需要进行长连接保持,并开启定期心跳。
151151
result = true;
152+
heartbeatDelay = parsed.getHeartbeatDelay();
152153
vertx.setTimer(heartbeatDelay, id -> netSocket.write(encode(TunnelMessageType.HEARTBEAT,
153154
TunnelMessage.Heartbeat.newBuilder().setTimestamp(System.currentTimeMillis()).build().toByteArray())));
154155
} else {
@@ -186,45 +187,55 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
186187
if (ar.succeeded()) {
187188
final NetSocket dataSocket = ar.result();
188189
dataSocket.pause();
190+
// 连接建立成功后,立马发送消息告诉数据服务,我是数据连接,并与用户连接进行绑定
191+
dataSocket.write(Buffer.buffer()
192+
.appendBytes(DATA_CONN_FLAG)
193+
.appendInt(sessionId));
194+
log.debug("{}: data connection {} established, notify data proxy server of current session id {}. wait for backend connection",
195+
dataProxyName,
196+
dataSocket.remoteAddress(),
197+
sessionId);
189198
netClient.connect(backendPort, backendHost).onComplete(rst -> {
190199
if (rst.succeeded()) {
191200
atomicResult.set(rst.succeeded());
192201
final NetSocket backendSocket = rst.result();
193202
backendSocket.pause();
194-
dataSocket.write(Buffer.buffer()
195-
.appendBytes(DATA_CONN_FLAG)
196-
.appendInt(sessionId));
203+
log.debug("{}: backend connection {} established", dataProxyName, backendSocket.remoteAddress());
197204
// 双向生命周期绑定、双向数据转发
198205
dataSocket.closeHandler(v -> {
199-
log.debug("data connection {} closed", dataSocket.remoteAddress());
206+
log.debug("{}: data connection {} closed", dataProxyName, dataSocket.remoteAddress());
200207
backendSocket.close();
201208
}).pipeTo(backendSocket).onFailure(e -> {
202-
log.error("data connection {} pipe to backend connection {} failed, connection will be closed",
209+
log.error("{}: data connection {} pipe to backend connection {} failed, connection will be closed",
210+
dataProxyName,
203211
dataSocket.remoteAddress(), backendSocket.remoteAddress(), e);
204212
dataSocket.close();
205213
});
206214
backendSocket.closeHandler(v -> {
207-
log.debug("backend connection {} closed", backendSocket.remoteAddress());
215+
log.debug("{}: backend connection {} closed", dataProxyName, backendSocket.remoteAddress());
208216
dataSocket.close();
209217
}).pipeTo(dataSocket).onFailure(e -> {
210-
log.error("backend connection {} pipe to data connection {} failed, connection will be closed",
218+
log.error("{}: backend connection {} pipe to data connection {} failed, connection will be closed",
219+
dataProxyName,
211220
backendSocket.remoteAddress(), dataSocket.remoteAddress(), e);
212221
backendSocket.close();
213222
});
214223
backendSocket.resume();
215224
dataSocket.resume();
216-
log.debug("data connection {} bound to backend connection {} for session id {}",
225+
log.debug("{}: data connection {} bound to backend connection {} for session id {}",
226+
dataProxyName,
217227
dataSocket.remoteAddress(),
218228
backendSocket.remoteAddress(),
219229
sessionId);
220230
} else {
221-
log.error("client open backend connection to {}:{} failed",
231+
log.error("{}: client open backend connection to {}:{} failed",
232+
dataProxyName,
222233
backendHost, backendPort, rst.cause());
223234
}
224235
latch.countDown();
225236
});
226237
} else {
227-
log.error("client open data connection to {}:{} failed", dataProxyHost, dataProxyPort, ar.cause());
238+
log.error("{}: client open data connection to {}:{} failed", dataProxyName, dataProxyHost, dataProxyPort, ar.cause());
228239
latch.countDown();
229240
}
230241

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ public class ReverseTcpProxyTunnelServer extends TunnelServer {
4646
private static final Logger log = LoggerFactory.getLogger(ReverseTcpProxyTunnelServer.class);
4747
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
4848

49-
5049
protected String host = "0.0.0.0"; // 控制服务监听的主机地址
5150
protected int port = 44444; // 控制服务监听的端口
5251
protected int judgeDelay = 30000;// 连接类型的判定延迟,单位毫秒
52+
protected int heartbeatDelay = 5000;// 毫秒
5353
protected Map<NetSocket, DataProxyServer> authedSockets = new ConcurrentHashMap<>();// 授权成功的控制连接与数据服务的对应关系
5454

5555
protected final String secret; // 鉴权密钥
@@ -77,6 +77,12 @@ public ReverseTcpProxyTunnelServer judgeDelay(int judgeDelay) {
7777
return this;
7878
}
7979

80+
81+
public ReverseTcpProxyTunnelServer heartbeatDelay(int heartbeatDelay) {
82+
this.heartbeatDelay = heartbeatDelay;
83+
return this;
84+
}
85+
8086
/**
8187
* 控制连接的处理逻辑
8288
*
@@ -420,6 +426,7 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
420426
TunnelMessage.OpenDataPort parsed = TunnelMessage.OpenDataPort.parseFrom(bodyBytes);
421427
TunnelMessage.OpenDataPortAck.Builder builder = TunnelMessage.OpenDataPortAck
422428
.newBuilder();
429+
builder.setHeartbeatDelay(heartbeatDelay);
423430
if (secret.equals(parsed.getSecret())) {
424431
synchronized (ReverseTcpProxyTunnelServer.class) {
425432
// 判断dataProxyName是否唯一
@@ -431,18 +438,28 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
431438
return result;
432439
}
433440
}
434-
final DataProxyServer dataProxyServer = new DataProxyServer(vertx,
435-
parsed.getDataProxyName(),
436-
parsed.getDataProxyPort(),
437-
netSocket, judgeDelay);
441+
String property = System.getProperty("setDataProxyHost", "false");
442+
log.debug("-DsetDataProxyHost: {}", property);
443+
final DataProxyServer dataProxyServer;
444+
if (Boolean.parseBoolean(property)) {
445+
dataProxyServer = new DataProxyServer(vertx,
446+
parsed.getDataProxyName(), parsed.getDataProxyHost(), parsed.getDataProxyPort(),
447+
netSocket, judgeDelay);
448+
} else {
449+
dataProxyServer = new DataProxyServer(vertx,
450+
parsed.getDataProxyName(),
451+
parsed.getDataProxyPort(),
452+
netSocket, judgeDelay);
453+
}
454+
log.debug("{} will listen on {}:{}", dataProxyServer.name, dataProxyServer.host, dataProxyServer.port);
438455
if (dataProxyServer.startSync()) {
439456
result = true;
440457
builder.setSuccess(result).setMessage("success");
441458
netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT_ACK,
442459
builder.build().toByteArray()));
443460
authedSockets.put(netSocket, dataProxyServer);
444461
} else {
445-
builder.setSuccess(result).setMessage("fail to open data port " + parsed.getDataProxyPort());
462+
builder.setSuccess(result).setMessage("failed to open data port " + parsed.getDataProxyPort());
446463
netSocket.write(encode(TunnelMessageType.OPEN_DATA_PORT_ACK,
447464
builder.build().toByteArray())).onComplete(ar -> netSocket.close());
448465
}

src/main/java/top/meethigher/proxy/tcp/tunnel/TunnelClient.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public abstract class TunnelClient extends Tunnel {
3939
*/
4040
protected final long maxDelay;
4141

42+
protected String name;
43+
4244
/**
4345
* 内部维护一个长连接 {@code NetSocket}
4446
* <p>
@@ -79,7 +81,7 @@ public TunnelMessageParser decode(final NetSocket socket) {
7981
* @param port 端口
8082
*/
8183
public void connect(final String host, final int port) {
82-
log.debug("client connect {}:{} ...", host, port);
84+
log.debug("{}: client connect {}:{} ...", name, host, port);
8385
netClient.connect(port, host).onComplete(ar -> handleConnectCompleteAsyncResult(ar, host, port));
8486
}
8587

@@ -91,7 +93,7 @@ public void connect(final String host, final int port) {
9193
*/
9294
public void emit(final TunnelMessageType type, final byte[] bodyBytes) {
9395
if (netSocket == null) {
94-
log.warn("socket is closed");
96+
log.warn("{}: socket is closed", name);
9597
} else {
9698
netSocket.write(encode(type, bodyBytes));
9799
}
@@ -113,15 +115,16 @@ protected void handleConnectCompleteAsyncResult(final AsyncResult<NetSocket> ar,
113115
this.netSocket = socket;
114116
socket.pause();
115117
socket.closeHandler(v -> {
116-
log.warn("closed {} -- {}, after {} ms will reconnect",
118+
log.warn("{}: closed {} -- {}, after {} ms will reconnect",
119+
name,
117120
socket.localAddress(),
118121
socket.remoteAddress(),
119122
reconnectDelay);
120123
this.netSocket = null;
121124
reconnect(host, port);
122125
});
123126
socket.handler(decode(socket));
124-
log.debug("client connected {}:{}", host, port);
127+
log.info("{}: client connected {}:{}", name, host, port);
125128

126129
// 执行连接成功的Handler
127130
TunnelHandler tunnelHandler = tunnelHandlers.get(null);
@@ -131,7 +134,8 @@ protected void handleConnectCompleteAsyncResult(final AsyncResult<NetSocket> ar,
131134
socket.resume();
132135
} else {
133136
Throwable e = ar.cause();
134-
log.error("client connect {}:{} error, after {} ms will reconnect",
137+
log.error("{}: client connect {}:{} error, after {} ms will reconnect",
138+
name,
135139
host,
136140
port,
137141
reconnectDelay,
@@ -153,7 +157,7 @@ protected void setReconnectDelay(final long delay) {
153157
*/
154158
protected void reconnect(final String host, final int port) {
155159
vertx.setTimer(reconnectDelay, id -> {
156-
log.debug("client reconnect {}:{} ...", host, port);
160+
log.debug("{}: client reconnect {}:{} ...", name, host, port);
157161
connect(host, port);
158162
setReconnectDelay(Math.min(reconnectDelay * 2, this.maxDelay));
159163
});

src/main/java/top/meethigher/proxy/tcp/tunnel/handler/AbstractTunnelHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ public void handle(Vertx vertx, NetSocket netSocket, Buffer buffer) {
2525
vertx.executeBlocking((Callable<Void>) () -> {
2626
TunnelMessageCodec.DecodedMessage decodedMessage = TunnelMessageCodec.decode(buffer);
2727
TunnelMessageType type = TunnelMessageType.fromCode(decodedMessage.type);
28-
log.debug("received message type = {}, doHandle ...", type);
28+
log.debug("received message type = {} from {}, doHandle ...", type, netSocket.remoteAddress());
2929
boolean result = doHandle(vertx, netSocket, type, decodedMessage.body);
30-
log.debug("received message type = {}, doHandle result = {}", type, result);
30+
log.debug("received message type = {} from {}, doHandle result = {}", type, netSocket.remoteAddress(), result);
3131
return null;
3232
});
3333
}

0 commit comments

Comments
 (0)