10
10
import io .vertx .core .net .SocketAddress ;
11
11
import org .slf4j .Logger ;
12
12
import org .slf4j .LoggerFactory ;
13
+ import top .meethigher .proxy .LoadBalancer ;
14
+ import top .meethigher .proxy .NetAddress ;
13
15
16
+ import java .util .ArrayList ;
17
+ import java .util .List ;
14
18
import java .util .concurrent .ThreadLocalRandom ;
15
19
16
20
/**
@@ -32,24 +36,32 @@ public class ReverseTcpProxy {
32
36
protected final Handler <NetSocket > connectHandler ;
33
37
protected final NetServer netServer ;
34
38
protected final NetClient netClient ;
35
- protected final String targetHost ;
36
- protected final int targetPort ;
39
+ protected final LoadBalancer < NetAddress > lb ;
40
+ protected final List < NetAddress > netAddresses ;
37
41
protected final String name ;
38
42
39
43
protected ReverseTcpProxy (NetServer netServer , NetClient netClient ,
40
- String targetHost , int targetPort , String name ) {
44
+ LoadBalancer <NetAddress > loadBalancer ,
45
+ List <NetAddress > netAddresses ,
46
+ String name ) {
41
47
this .name = name ;
42
- this .targetHost = targetHost ;
43
- this .targetPort = targetPort ;
48
+ this .lb = loadBalancer ;
49
+ this .netAddresses = netAddresses ;
44
50
this .netServer = netServer ;
45
51
this .netClient = netClient ;
46
52
this .connectHandler = sourceSocket -> {
47
53
// 暂停流读取
48
54
sourceSocket .pause ();
49
55
SocketAddress sourceRemote = sourceSocket .remoteAddress ();
50
56
SocketAddress sourceLocal = sourceSocket .localAddress ();
51
- log .debug ("source {} -- {} connected" , sourceLocal , sourceRemote );
52
57
sourceSocket .closeHandler (v -> log .debug ("source {} -- {} closed" , sourceLocal , sourceRemote ));
58
+ NetAddress next = lb .next ();
59
+ String targetHost = next .getHost ();
60
+ int targetPort = next .getPort ();
61
+ log .debug ("source {} -- {} connected. lb [{}] next target {}" , sourceLocal , sourceRemote ,
62
+ lb .name (),
63
+ next
64
+ );
53
65
netClient .connect (targetPort , targetHost )
54
66
.onFailure (e -> {
55
67
log .error ("failed to connect to {}:{}" , targetHost , targetPort , e );
@@ -61,7 +73,7 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
61
73
SocketAddress targetRemote = targetSocket .remoteAddress ();
62
74
SocketAddress targetLocal = targetSocket .localAddress ();
63
75
log .debug ("target {} -- {} connected" , targetLocal , targetRemote );
64
-
76
+
65
77
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
66
78
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
67
79
targetSocket .closeHandler (v -> log .debug ("target {} -- {} closed" , targetLocal , targetRemote ));
@@ -88,20 +100,56 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
88
100
89
101
public static ReverseTcpProxy create (Vertx vertx ,
90
102
String targetHost , int targetPort , String name ) {
91
- return new ReverseTcpProxy (vertx .createNetServer (), vertx .createNetClient (), targetHost , targetPort , name );
103
+ List <NetAddress > list = new ArrayList <>();
104
+ TcpRoundRobinLoadBalancer lb = TcpRoundRobinLoadBalancer .create (list );
105
+ return new ReverseTcpProxy (
106
+ vertx .createNetServer (),
107
+ vertx .createNetClient (),
108
+ lb ,
109
+ list ,
110
+ name
111
+ ).addNode (new NetAddress (targetHost , targetPort ));
92
112
}
93
113
94
114
public static ReverseTcpProxy create (Vertx vertx ,
95
115
String targetHost , int targetPort ) {
96
- return new ReverseTcpProxy (vertx .createNetServer (), vertx .createNetClient (), targetHost , targetPort , generateName ());
116
+ List <NetAddress > list = new ArrayList <>();
117
+ return new ReverseTcpProxy (
118
+ vertx .createNetServer (),
119
+ vertx .createNetClient (),
120
+ TcpRoundRobinLoadBalancer .create (list ),
121
+ list ,
122
+ generateName ()
123
+ ).addNode (new NetAddress (targetHost , targetPort ));
97
124
}
98
125
99
126
public static ReverseTcpProxy create (NetServer netServer , NetClient netClient , String targetHost , int targetPort ) {
100
- return new ReverseTcpProxy (netServer , netClient , targetHost , targetPort , generateName ());
127
+ List <NetAddress > list = new ArrayList <>();
128
+ return new ReverseTcpProxy (
129
+ netServer ,
130
+ netClient ,
131
+ TcpRoundRobinLoadBalancer .create (list ),
132
+ list ,
133
+ generateName ()
134
+ ).addNode (new NetAddress (targetHost , targetPort ));
101
135
}
102
136
103
137
public static ReverseTcpProxy create (NetServer netServer , NetClient netClient , String targetHost , int targetPort , String name ) {
104
- return new ReverseTcpProxy (netServer , netClient , targetHost , targetPort , name );
138
+ List <NetAddress > list = new ArrayList <>();
139
+ return new ReverseTcpProxy (
140
+ netServer ,
141
+ netClient ,
142
+ TcpRoundRobinLoadBalancer .create (list ),
143
+ list ,
144
+ name
145
+ ).addNode (new NetAddress (targetHost , targetPort ));
146
+ }
147
+
148
+ public static ReverseTcpProxy create (NetServer netServer , NetClient netClient ,
149
+ LoadBalancer <NetAddress > loadBalancer ,
150
+ List <NetAddress > netAddresses ,
151
+ String name ) {
152
+ return new ReverseTcpProxy (netServer , netClient , loadBalancer , netAddresses , name );
105
153
}
106
154
107
155
public ReverseTcpProxy port (int port ) {
@@ -114,8 +162,15 @@ public ReverseTcpProxy host(String host) {
114
162
return this ;
115
163
}
116
164
165
+ public ReverseTcpProxy addNode (NetAddress netAddress ) {
166
+ if (!netAddresses .contains (netAddress )) {
167
+ netAddresses .add (netAddress );
168
+ }
169
+ return this ;
170
+ }
171
+
117
172
118
- protected static String generateName () {
173
+ public static String generateName () {
119
174
final String prefix = ReverseTcpProxy .class .getSimpleName () + "-" ;
120
175
try {
121
176
// 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠
@@ -135,12 +190,15 @@ protected static String generateName() {
135
190
}
136
191
137
192
public void start () {
193
+ if (netAddresses .size () <= 0 ) {
194
+ throw new IllegalStateException ("netAddresses size must be greater than 0" );
195
+ }
138
196
netServer .connectHandler (connectHandler ).exceptionHandler (e -> log .error ("connect failed" , e ));
139
197
Future <NetServer > listenFuture = netServer .listen (sourcePort , sourceHost );
140
198
141
199
Handler <AsyncResult <NetServer >> asyncResultHandler = ar -> {
142
200
if (ar .succeeded ()) {
143
- log .info ("{} started on {}:{}" , name , sourceHost , sourcePort );
201
+ log .info ("{} started on {}:{}\n LB-Mode: {} \n {} " , name , sourceHost , sourcePort , lb . name (), netAddresses );
144
202
} else {
145
203
Throwable e = ar .cause ();
146
204
log .error ("{} start failed" , name , e );
0 commit comments