9
9
import io .vertx .core .net .NetSocket ;
10
10
import org .slf4j .Logger ;
11
11
import org .slf4j .LoggerFactory ;
12
+ import top .meethigher .proxy .tcp .tunnel .codec .TunnelMessageCodec ;
12
13
import top .meethigher .proxy .tcp .tunnel .codec .TunnelMessageType ;
13
14
import top .meethigher .proxy .tcp .tunnel .handler .AbstractTunnelHandler ;
14
15
import top .meethigher .proxy .tcp .tunnel .handler .TunnelHandler ;
15
16
import top .meethigher .proxy .tcp .tunnel .proto .TunnelMessage ;
17
+ import top .meethigher .proxy .tcp .tunnel .utils .IdGenerator ;
18
+ import top .meethigher .proxy .tcp .tunnel .utils .UserConnection ;
16
19
20
+ import java .util .ArrayList ;
17
21
import java .util .Map ;
18
22
import java .util .concurrent .ConcurrentHashMap ;
19
23
import java .util .concurrent .CountDownLatch ;
@@ -35,7 +39,6 @@ public class ReverseTcpProxyTunnelServer extends TunnelServer {
35
39
36
40
private static final Logger log = LoggerFactory .getLogger (ReverseTcpProxyTunnelServer .class );
37
41
protected static final char [] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" .toCharArray ();
38
- protected static final String SECRET_DEFAULT = "0123456789" ;
39
42
40
43
41
44
protected String host = "0.0.0.0" ;
@@ -104,7 +107,7 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
104
107
TunnelMessage .OpenDataPortAck .Builder builder = TunnelMessage .OpenDataPortAck
105
108
.newBuilder ();
106
109
if (secret .equals (parsed .getSecret ())) {
107
- final DataProxyServer dataProxyServer = new DataProxyServer (vertx , parsed .getDataProxyName (), parsed .getDataProxyPort ());
110
+ final DataProxyServer dataProxyServer = new DataProxyServer (vertx , parsed .getDataProxyName (), parsed .getDataProxyPort (), netSocket );
108
111
if (dataProxyServer .startSync ()) {
109
112
result = true ;
110
113
builder .setSuccess (result ).setMessage ("success" );
@@ -126,7 +129,21 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
126
129
netSocket .write (encode (TunnelMessageType .OPEN_DATA_PORT_ACK ,
127
130
ack .toByteArray ())).onComplete (ar -> netSocket .close ());
128
131
}
129
- } catch (Exception e ) {
132
+ } catch (Exception ignore ) {
133
+ }
134
+ return result ;
135
+ }
136
+ });
137
+
138
+ // 监听数据连接响应事件
139
+ this .on (TunnelMessageType .OPEN_DATA_CONN_ACK , new AbstractTunnelHandler () {
140
+ @ Override
141
+ protected boolean doHandle (Vertx vertx , NetSocket netSocket , TunnelMessageType type , byte [] bodyBytes ) {
142
+ boolean result = false ;
143
+ try {
144
+ TunnelMessage .OpenDataConnAck openDataConnAck = TunnelMessage .OpenDataConnAck .parseFrom (bodyBytes );
145
+ result = openDataConnAck .getSuccess ();
146
+ } catch (Exception ignore ) {
130
147
}
131
148
return result ;
132
149
}
@@ -194,30 +211,167 @@ protected static class DataProxyServer {
194
211
protected final Vertx vertx ;
195
212
protected final NetServer netServer ;
196
213
protected final String name ;
197
- protected final Handler <NetSocket > connectHandler ;
198
-
199
214
protected final String host ;
200
215
protected final int port ;
216
+ protected final NetSocket controlSocket ;
217
+ protected final int judgeDelay ;// 连接类型的判定延迟,单位毫秒
218
+ // 等待与数据连接进行配对的用户连接
219
+ protected final Map <Integer , UserConnection > unboundUserConnections = new ConcurrentHashMap <>();
220
+
201
221
202
222
public DataProxyServer (Vertx vertx , String name ,
203
- String host , int port ) {
223
+ String host , int port ,
224
+ NetSocket controlSocket ,
225
+ int judgeDelay ) {
204
226
this .vertx = vertx ;
205
227
this .name = name ;
206
228
this .host = host ;
207
229
this .port = port ;
230
+ this .controlSocket = controlSocket ;
231
+ this .judgeDelay = judgeDelay ;
208
232
this .netServer = this .vertx .createNetServer ();
209
- this .connectHandler = socket -> {
210
- };
211
233
}
212
234
213
235
public DataProxyServer (Vertx vertx , String name ,
214
- int port ) {
215
- this (vertx , name , "0.0.0.0" , port );
236
+ int port ,
237
+ NetSocket controlSocket ) {
238
+ this (vertx , name , "0.0.0.0" , port , controlSocket , 30000 );
239
+ }
240
+
241
+ /**
242
+ * 连接有两种,分别为用户连接和数据连接。
243
+ * <p>
244
+ * 用户连接由用户主动发起,对我来说是不可控的。
245
+ * <p>
246
+ * 数据连接由 {@code TunnelClient} 主动发起,对我来说是可控的。
247
+ * <p>
248
+ * 由于无法直接通过TCP连接来判定类型,因此我需要在编写数据连接时,让 {@code TunnelClient}连接 {@code TunnelServer}成功后主动发送一条特定的消息,格式为:
249
+ * <pre>4字节标识码+4字节唯一编号</pre>
250
+ * <p>
251
+ * 通过标识码判定是用户连接还是数据连接,通过唯一编号判定用户连接和数据连接的对应关系
252
+ *
253
+ * @param socket 连接
254
+ */
255
+ protected void handleConnect (NetSocket socket ) {
256
+ socket .pause ();
257
+ /**
258
+ * 连接的判定,分为两种情况。
259
+ * 第一种:用户建立连接后,主动发送数据请求,此时直接通过数据包即可判定用户连接还是数据连接。如HTTP
260
+ * 第二种:用户建立连接后,等待服务端主动发送请求,此时就需要使用到延迟判定他是一个数据连接。如SSH
261
+ */
262
+ final long timerId = vertx .setTimer (judgeDelay , id -> {
263
+ handleUserConnection (socket , null , -1 );
264
+ });
265
+ // 创建缓冲区
266
+ final Buffer buf = Buffer .buffer ();
267
+ socket .handler (buffer -> {
268
+ buf .appendBuffer (buffer );
269
+ if (buf .length () < 8 ) {
270
+ return ;
271
+ }
272
+ if (buf .getByte (0 ) == Tunnel .DATA_CONN_FLAG [0 ]
273
+ && buf .getByte (1 ) == Tunnel .DATA_CONN_FLAG [1 ]
274
+ && buf .getByte (2 ) == Tunnel .DATA_CONN_FLAG [2 ]
275
+ && buf .getByte (3 ) == Tunnel .DATA_CONN_FLAG [3 ]
276
+ ) {
277
+ handleDataConnection (socket , buf , timerId );
278
+ } else {
279
+ handleUserConnection (socket , buf , timerId );
280
+ }
281
+ });
282
+ log .debug ("{}: connection {} established, is it a data connection or user connection?" , name , socket .remoteAddress ());
283
+ socket .resume ();
284
+ }
285
+
286
+ /**
287
+ * 数据连接的处理逻辑
288
+ *
289
+ * @param socket 连接
290
+ * @param buf 数据
291
+ * @param timerId 延时判定数据连接的定时器
292
+ */
293
+ protected void handleDataConnection (NetSocket socket , Buffer buf , long timerId ) {
294
+ log .debug ("{}: oh, connection {} is a data connection!" , name , socket .remoteAddress ());
295
+ // 取消延迟判定的逻辑
296
+ if (timerId != -1 ) {
297
+ vertx .cancelTimer (timerId );
298
+ }
299
+ // 数据连接
300
+ int sessionId = buf .getInt (4 );
301
+ UserConnection userConn = unboundUserConnections .remove (sessionId );
302
+ if (userConn != null ) {
303
+ bindConnections (userConn , socket , sessionId );
304
+ } else {
305
+ log .debug ("{}: invalid session id {}, connection {} will be closed" , name , sessionId , socket .remoteAddress ());
306
+ socket .close ();
307
+ }
308
+ }
309
+
310
+ /**
311
+ * 用户连接的处理逻辑
312
+ *
313
+ * @param socket 连接
314
+ * @param buf 数据
315
+ * @param timerId 延时判定数据连接的定时器
316
+ */
317
+ protected void handleUserConnection (NetSocket socket , Buffer buf , long timerId ) {
318
+ log .debug ("{}: oh, connection {} is a user connection!" , name , socket .remoteAddress ());
319
+ // 取消延迟判定的逻辑
320
+ if (timerId != -1 ) {
321
+ vertx .cancelTimer (timerId );
322
+ }
323
+ // 用户连接
324
+ int sessionId = IdGenerator .nextId ();
325
+ UserConnection userConn = new UserConnection (sessionId , socket , new ArrayList <>());
326
+ if (buf != null ) {
327
+ userConn .buffers .add (buf .copy ());
328
+ }
329
+ unboundUserConnections .put (sessionId , userConn );
330
+ log .debug ("{}: user connection {} create session id {}, wait for data connection ..." ,
331
+ name , socket .remoteAddress (), sessionId );
332
+ // 通过控制连接通知TunnelClient主动建立数据连接。服务端不需要通知客户端需要连接的端口,因为数据端口的启动是由客户端通知服务端开启的。
333
+ controlSocket .write (TunnelMessageCodec .encode (TunnelMessageType .OPEN_DATA_CONN .code (),
334
+ TunnelMessage .OpenDataConn .newBuilder ().setSessionId (sessionId ).build ().toByteArray ()));
335
+ }
336
+
337
+ /**
338
+ * 将用户连接与数据连接进行双向生命周期绑定、双向数据转发
339
+ *
340
+ * @param userConn 用户连接信息,含socket
341
+ * @param dataSocket 数据连接socket
342
+ * @param sessionId 绑定的会话编号
343
+ */
344
+ protected void bindConnections (UserConnection userConn , NetSocket dataSocket , int sessionId ) {
345
+ NetSocket userSocket = userConn .netSocket ;
346
+ // 双向生命周期绑定、双向数据转发
347
+ userSocket .closeHandler (v -> {
348
+ log .debug ("{}: user connection {} closed" , name , userSocket .remoteAddress ());
349
+ dataSocket .close ();
350
+ }).pipeTo (dataSocket ).onFailure (e -> {
351
+ log .error ("{}: user connection {} pipe to data connection {} failed, connection will be closed" ,
352
+ name , userSocket .remoteAddress (), dataSocket .remoteAddress (), e );
353
+ dataSocket .close ();
354
+ });
355
+ dataSocket .closeHandler (v -> {
356
+ log .debug ("{}: data connection {} closed" , name , dataSocket .remoteAddress ());
357
+ userSocket .close ();
358
+ }).pipeTo (userSocket ).onFailure (e -> {
359
+ log .error ("{}: data connection {} pipe to user connection {} failed, connection will be closed" ,
360
+ name , dataSocket .remoteAddress (), userSocket .remoteAddress (), e );
361
+ userSocket .close ();
362
+ });
363
+ // 将用户连接中的缓存数据发出。
364
+ userConn .buffers .forEach (dataSocket ::write );
365
+ log .debug ("{}: data connection {} bound to user connection {} for session id {}" ,
366
+ name ,
367
+ dataSocket .remoteAddress (),
368
+ userSocket .remoteAddress (),
369
+ sessionId );
216
370
}
217
371
218
372
public void start () {
219
373
this .netServer
220
- .connectHandler (this . connectHandler )
374
+ .connectHandler (this :: handleConnect )
221
375
.listen (port , host )
222
376
.onComplete (ar -> {
223
377
if (ar .succeeded ()) {
@@ -239,7 +393,7 @@ public boolean startSync() {
239
393
CountDownLatch latch = new CountDownLatch (1 );
240
394
AtomicBoolean success = new AtomicBoolean (false );
241
395
this .netServer
242
- .connectHandler (this . connectHandler )
396
+ .connectHandler (this :: handleConnect )
243
397
.listen (port , host )
244
398
.onComplete (ar -> {
245
399
if (ar .succeeded ()) {
0 commit comments