Skip to content

Commit 5324b62

Browse files
committed
feat: 1. 添加udp反向代理,并支持lb 2. 调整tcp反向代理lb相关使用
1 parent 289634c commit 5324b62

File tree

7 files changed

+218
-43
lines changed

7 files changed

+218
-43
lines changed

src/main/java/top/meethigher/proxy/LoadBalancer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package top.meethigher.proxy;
22

3+
import java.util.List;
4+
35
/**
46
* 负载均衡策略
57
*
@@ -10,4 +12,6 @@ public interface LoadBalancer<T> {
1012
T next();
1113

1214
String name();
15+
16+
List<T> all();
1317
}

src/main/java/top/meethigher/proxy/tcp/TcpRoundRobinLoadBalancer.java renamed to src/main/java/top/meethigher/proxy/RoundRobinLoadBalancer.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
package top.meethigher.proxy.tcp;
2-
3-
import top.meethigher.proxy.LoadBalancer;
4-
import top.meethigher.proxy.NetAddress;
1+
package top.meethigher.proxy;
52

63
import java.util.List;
74
import java.util.concurrent.atomic.AtomicInteger;
@@ -12,16 +9,20 @@
129
* @author <a href="https://meethigher.top">chenchuancheng</a>
1310
* @since 2025/07/26 13:41
1411
*/
15-
public class TcpRoundRobinLoadBalancer implements LoadBalancer<NetAddress> {
12+
public class RoundRobinLoadBalancer implements LoadBalancer<NetAddress> {
1613

1714
private final List<NetAddress> nodes;
1815

1916
private final AtomicInteger idx = new AtomicInteger(0);
2017

21-
private final String name = "TcpRoundRobinLoadBalancer";
18+
private final String name;
2219

23-
private TcpRoundRobinLoadBalancer(List<NetAddress> nodes) {
20+
private RoundRobinLoadBalancer(List<NetAddress> nodes) {
21+
if (nodes.size() <= 0) {
22+
throw new IllegalStateException("nodes size must be greater than 0");
23+
}
2424
this.nodes = nodes;
25+
this.name = RoundRobinLoadBalancer.class.getSimpleName();
2526
}
2627

2728

@@ -38,7 +39,12 @@ public String name() {
3839
return name;
3940
}
4041

41-
public static TcpRoundRobinLoadBalancer create(List<NetAddress> nodes) {
42-
return new TcpRoundRobinLoadBalancer(nodes);
42+
@Override
43+
public List<NetAddress> all() {
44+
return nodes;
45+
}
46+
47+
public static RoundRobinLoadBalancer create(List<NetAddress> nodes) {
48+
return new RoundRobinLoadBalancer(nodes);
4349
}
4450
}

src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.slf4j.LoggerFactory;
1111
import top.meethigher.proxy.LoadBalancer;
1212
import top.meethigher.proxy.NetAddress;
13+
import top.meethigher.proxy.RoundRobinLoadBalancer;
1314

1415
import java.util.ArrayList;
1516
import java.util.List;
@@ -35,16 +36,13 @@ public class ReverseTcpProxy {
3536
protected final NetServer netServer;
3637
protected final NetClient netClient;
3738
protected final LoadBalancer<NetAddress> lb;
38-
protected final List<NetAddress> netAddresses;
3939
protected final String name;
4040

4141
protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
4242
LoadBalancer<NetAddress> loadBalancer,
43-
List<NetAddress> netAddresses,
4443
String name) {
4544
this.name = name;
4645
this.lb = loadBalancer;
47-
this.netAddresses = netAddresses;
4846
this.netServer = netServer;
4947
this.netClient = netClient;
5048
this.connectHandler = sourceSocket -> {
@@ -105,55 +103,54 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
105103
public static ReverseTcpProxy create(Vertx vertx,
106104
String targetHost, int targetPort, String name) {
107105
List<NetAddress> list = new ArrayList<>();
108-
TcpRoundRobinLoadBalancer lb = TcpRoundRobinLoadBalancer.create(list);
106+
RoundRobinLoadBalancer lb = RoundRobinLoadBalancer.create(list);
107+
list.add(new NetAddress(targetHost, targetPort));
109108
return new ReverseTcpProxy(
110109
vertx.createNetServer(),
111110
vertx.createNetClient(),
112111
lb,
113-
list,
114112
name
115-
).addNode(new NetAddress(targetHost, targetPort));
113+
);
116114
}
117115

118116
public static ReverseTcpProxy create(Vertx vertx,
119117
String targetHost, int targetPort) {
120118
List<NetAddress> list = new ArrayList<>();
119+
list.add(new NetAddress(targetHost, targetPort));
121120
return new ReverseTcpProxy(
122121
vertx.createNetServer(),
123122
vertx.createNetClient(),
124-
TcpRoundRobinLoadBalancer.create(list),
125-
list,
123+
RoundRobinLoadBalancer.create(list),
126124
generateName()
127-
).addNode(new NetAddress(targetHost, targetPort));
125+
);
128126
}
129127

130128
public static ReverseTcpProxy create(NetServer netServer, NetClient netClient, String targetHost, int targetPort) {
131129
List<NetAddress> list = new ArrayList<>();
130+
list.add(new NetAddress(targetHost, targetPort));
132131
return new ReverseTcpProxy(
133132
netServer,
134133
netClient,
135-
TcpRoundRobinLoadBalancer.create(list),
136-
list,
134+
RoundRobinLoadBalancer.create(list),
137135
generateName()
138-
).addNode(new NetAddress(targetHost, targetPort));
136+
);
139137
}
140138

141139
public static ReverseTcpProxy create(NetServer netServer, NetClient netClient, String targetHost, int targetPort, String name) {
142140
List<NetAddress> list = new ArrayList<>();
141+
list.add(new NetAddress(targetHost, targetPort));
143142
return new ReverseTcpProxy(
144143
netServer,
145144
netClient,
146-
TcpRoundRobinLoadBalancer.create(list),
147-
list,
145+
RoundRobinLoadBalancer.create(list),
148146
name
149-
).addNode(new NetAddress(targetHost, targetPort));
147+
);
150148
}
151149

152150
public static ReverseTcpProxy create(NetServer netServer, NetClient netClient,
153151
LoadBalancer<NetAddress> loadBalancer,
154-
List<NetAddress> netAddresses,
155152
String name) {
156-
return new ReverseTcpProxy(netServer, netClient, loadBalancer, netAddresses, name);
153+
return new ReverseTcpProxy(netServer, netClient, loadBalancer, name);
157154
}
158155

159156
public ReverseTcpProxy port(int port) {
@@ -166,13 +163,6 @@ public ReverseTcpProxy host(String host) {
166163
return this;
167164
}
168165

169-
public ReverseTcpProxy addNode(NetAddress netAddress) {
170-
if (!netAddresses.contains(netAddress)) {
171-
netAddresses.add(netAddress);
172-
}
173-
return this;
174-
}
175-
176166

177167
public static String generateName() {
178168
final String prefix = ReverseTcpProxy.class.getSimpleName() + "-";
@@ -194,14 +184,11 @@ public static String generateName() {
194184
}
195185

196186
public void start() {
197-
if (netAddresses.size() <= 0) {
198-
throw new IllegalStateException("netAddresses size must be greater than 0");
199-
}
200187
netServer.connectHandler(connectHandler)
201188
.exceptionHandler(e -> log.error("{} socket errors happening before the connection is passed to the connectHandler", name, e))
202189
.listen(sourcePort, sourceHost)
203190
.onFailure(e -> log.error("{} start failed", name, e))
204-
.onSuccess(v -> log.info("{} started on {}:{}\nLB-Mode: {}\n {}", name, sourceHost, sourcePort, lb.name(), netAddresses));
191+
.onSuccess(v -> log.info("{} started on {}:{}\nLB-Mode: {}\n {}", name, sourceHost, sourcePort, lb.name(), lb.all()));
205192
}
206193

207194
public void stop() {
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package top.meethigher.proxy.udp;
2+
3+
import io.vertx.core.Vertx;
4+
import io.vertx.core.datagram.DatagramSocket;
5+
import io.vertx.core.net.SocketAddress;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import top.meethigher.proxy.LoadBalancer;
9+
import top.meethigher.proxy.NetAddress;
10+
11+
import java.util.concurrent.ThreadLocalRandom;
12+
13+
/**
14+
* UDP反向代理
15+
*
16+
* @author <a href="https://meethigher.top">chenchuancheng</a>
17+
* @see <a href="https://github.com/meethigher/tcp-reverse-proxy/issues/24">支持UDP反代 · Issue #24 · meethigher/tcp-reverse-proxy</a>
18+
* @since 2025/08/10 19:15
19+
*/
20+
public class ReverseUdpProxy {
21+
private static final Logger log = LoggerFactory.getLogger(ReverseUdpProxy.class);
22+
23+
protected static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
24+
25+
// 单位毫秒
26+
public static final long DST_TIMEOUT_DEFAULT = 60000;
27+
28+
protected String sourceHost = "0.0.0.0";
29+
protected int sourcePort = 999;
30+
31+
protected DatagramSocket src;
32+
33+
protected final Vertx vertx;
34+
protected final long dstTimeout;
35+
protected final LoadBalancer<NetAddress> lb;
36+
protected final String name;
37+
38+
protected ReverseUdpProxy(Vertx vertx, long dstTimeout,
39+
LoadBalancer<NetAddress> loadBalancer,
40+
String name) {
41+
this.vertx = vertx;
42+
this.dstTimeout = dstTimeout;
43+
this.lb = loadBalancer;
44+
this.name = name;
45+
}
46+
47+
public ReverseUdpProxy host(String host) {
48+
this.sourceHost = host;
49+
return this;
50+
}
51+
52+
public ReverseUdpProxy port(int port) {
53+
this.sourcePort = port;
54+
return this;
55+
}
56+
57+
public void start() {
58+
src = vertx.createDatagramSocket();
59+
String srcLocal = sourceHost + ":" + sourcePort;
60+
src.handler(srcPk -> {
61+
NetAddress dstRemote = lb.next();
62+
SocketAddress srcRemote = srcPk.sender();
63+
log.debug("source {} -- {} connected. lb [{}] next target {}", srcRemote, srcLocal, lb.name(), dstRemote);
64+
DatagramSocket dst = vertx.createDatagramSocket();
65+
66+
final long timerId = vertx.setTimer(dstTimeout, id -> dst.close());
67+
dst.handler(dstPk -> {
68+
SocketAddress sender = srcPk.sender();
69+
src.send(dstPk.data(), sender.port(), sender.host()).onComplete(ar -> {
70+
if (ar.succeeded()) {
71+
log.debug("target {} -- {} pipe to source {} -- {} succeeded",
72+
dstRemote, dst.localAddress(), srcLocal, srcRemote);
73+
} else {
74+
log.error("target {} -- {} pipe to source {} -- {} failed",
75+
dstRemote, dst.localAddress(), srcLocal, srcRemote, ar.cause());
76+
}
77+
vertx.cancelTimer(timerId);
78+
dst.close();
79+
});
80+
});
81+
dst.send(srcPk.data(), dstRemote.getPort(), dstRemote.getHost())
82+
.onFailure(e -> log.error("source {} -- {} pipe to target {} -- {} failed",
83+
srcRemote, srcLocal, dst.localAddress(), dstRemote, e))
84+
.onSuccess(v -> log.info("sourcce {} -- {} pipe to target {} -- {} succeeded",
85+
srcRemote, srcLocal, dst.localAddress(), dstRemote));
86+
87+
88+
});
89+
src.listen(this.sourcePort, this.sourceHost)
90+
.onFailure(e -> log.error("{} start failed", name, e))
91+
.onSuccess(v -> log.info("{} started on {}:{}\nLB-Mode: {}\n {}",
92+
name, sourceHost, sourcePort, lb.name(), lb.all()));
93+
}
94+
95+
public void stop() {
96+
if (src != null) {
97+
src.close();
98+
log.info("{} closed", name);
99+
}
100+
}
101+
102+
103+
public static ReverseUdpProxy create(Vertx vertx, long dstTimeout,
104+
LoadBalancer<NetAddress> loadBalancer,
105+
String name) {
106+
return new ReverseUdpProxy(vertx, dstTimeout, loadBalancer, name);
107+
}
108+
109+
public static ReverseUdpProxy create(Vertx vertx, LoadBalancer<NetAddress> loadBalancer) {
110+
return create(vertx, DST_TIMEOUT_DEFAULT, loadBalancer, generateName());
111+
}
112+
113+
public static ReverseUdpProxy create(Vertx vertx, long dstTimeout, LoadBalancer<NetAddress> loadBalancer) {
114+
return create(vertx, dstTimeout, loadBalancer, generateName());
115+
}
116+
117+
118+
public static String generateName() {
119+
final String prefix = ReverseUdpProxy.class.getSimpleName() + "-";
120+
try {
121+
// 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠
122+
synchronized (System.getProperties()) {
123+
final String next = String.valueOf(Integer.getInteger(ReverseUdpProxy.class.getName() + ".name", 0) + 1);
124+
System.setProperty(ReverseUdpProxy.class.getName() + ".name", next);
125+
return prefix + next;
126+
}
127+
} catch (Exception e) {
128+
final ThreadLocalRandom random = ThreadLocalRandom.current();
129+
final StringBuilder sb = new StringBuilder(prefix);
130+
for (int i = 0; i < 4; i++) {
131+
sb.append(ID_CHARACTERS[random.nextInt(62)]);
132+
}
133+
return sb.toString();
134+
}
135+
}
136+
137+
}

src/test/java/top/meethigher/proxy/tcp/ReverseTcpProxyTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.vertx.core.net.NetServer;
66
import org.junit.Test;
77
import top.meethigher.proxy.NetAddress;
8+
import top.meethigher.proxy.RoundRobinLoadBalancer;
89

910
import java.util.ArrayList;
1011
import java.util.List;
@@ -23,16 +24,20 @@ public void testVertxTCPReverseProxy() throws Exception {
2324

2425

2526
@Test
26-
public void testLb() {
27+
public void testLb() throws Exception {
2728
Vertx vertx = Vertx.vertx();
2829
NetServer netServer = vertx.createNetServer();
2930
NetClient netClient = vertx.createNetClient();
3031
List<NetAddress> list = new ArrayList<>();
31-
ReverseTcpProxy.create(netServer, netClient, TcpRoundRobinLoadBalancer.create(list), list, ReverseTcpProxy.generateName())
32-
.addNode(new NetAddress("10.0.0.20", 22))
33-
.addNode(new NetAddress("10.0.0.30", 22))
32+
list.add(new NetAddress("10.0.0.20", 22));
33+
list.add(new NetAddress("10.0.0.30", 22));
34+
ReverseTcpProxy.create(netServer, netClient, RoundRobinLoadBalancer.create(list), ReverseTcpProxy.generateName())
3435
.start();
3536

37+
TimeUnit.SECONDS.sleep(10);
38+
39+
list.add(new NetAddress("meethigher.top", 80));
40+
3641
LockSupport.park();
3742
}
3843
}

src/test/java/top/meethigher/proxy/tcp/TcpRoundRobinLoadBalancerTest.java renamed to src/test/java/top/meethigher/proxy/tcp/RoundRobinLoadBalancerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,19 @@
22

33
import org.junit.Test;
44
import top.meethigher.proxy.NetAddress;
5+
import top.meethigher.proxy.RoundRobinLoadBalancer;
56

67
import java.util.ArrayList;
78
import java.util.List;
89

9-
public class TcpRoundRobinLoadBalancerTest {
10+
public class RoundRobinLoadBalancerTest {
1011

1112
@Test
1213
public void next() {
1314
List<NetAddress> nodes = new ArrayList<>();
1415
nodes.add(new NetAddress("127.0.0.1", 6666));
1516
nodes.add(new NetAddress("127.0.0.1", 6667));
16-
TcpRoundRobinLoadBalancer balancer = TcpRoundRobinLoadBalancer.create(nodes);
17+
RoundRobinLoadBalancer balancer = RoundRobinLoadBalancer.create(nodes);
1718
System.out.println(balancer.next());
1819
System.out.println(balancer.next());
1920
System.out.println(balancer.next());

0 commit comments

Comments
 (0)