Skip to content

Commit f3034da

Browse files
committed
feat: 调整api
1 parent 540f1f2 commit f3034da

File tree

4 files changed

+273
-178
lines changed

4 files changed

+273
-178
lines changed

src/main/java/top/meethigher/TCPReverseProxy.java

Lines changed: 0 additions & 140 deletions
This file was deleted.
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package top.meethigher;
2+
3+
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
import java.io.InputStream;
8+
import java.io.OutputStream;
9+
import java.net.InetAddress;
10+
import java.net.ServerSocket;
11+
import java.net.Socket;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
import java.util.concurrent.*;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
17+
/**
18+
* 简易的Tcp反向代理工具
19+
* 该工具只做简单使用,不适用于高并发的场景。
20+
*
21+
* @author chenchuancheng
22+
* @since 2024/10/13 21:34
23+
*/
24+
public class TcpReverseProxy {
25+
26+
private static final Logger log = LoggerFactory.getLogger(TcpReverseProxy.class);
27+
28+
private static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray();
29+
30+
private final String targetHost;
31+
32+
private final int targetPort;
33+
34+
private final ExecutorService bossExecutor;
35+
36+
private final String name;
37+
38+
private final Map<Socket, Socket> connections = new HashMap<>();
39+
40+
/**
41+
* 反向代理1个tcp连接,使用的是bio模式,需要使用两个线程,直到tcp连接关闭方能释放。
42+
* 因此建议线程池的 maxPoolSize=maxConnections*2
43+
*/
44+
private ExecutorService workerExecutor;
45+
46+
/**
47+
* 最大连接数
48+
*/
49+
private int maxConnections;
50+
51+
private int bufferSize = 2 * 1024;
52+
53+
54+
private ServerSocket serverSocket;
55+
56+
private String sourceHost = "0.0.0.0";
57+
58+
private int sourcePort = 88;
59+
60+
private TcpReverseProxy(String targetHost, int targetPort, String name) {
61+
this.name = name;
62+
this.targetHost = targetHost;
63+
this.targetPort = targetPort;
64+
this.maxConnections = 2;
65+
this.workerExecutor = new ThreadPoolExecutor(1, maxConnections * 2, 1, TimeUnit.MINUTES, new SynchronousQueue<>(), new ThreadFactory() {
66+
final AtomicInteger ai = new AtomicInteger(1);
67+
68+
@Override
69+
public Thread newThread(Runnable r) {
70+
Thread thread = new Thread(r);
71+
thread.setName(name + "-worker-" + ai.getAndIncrement());
72+
return thread;
73+
}
74+
});
75+
this.bossExecutor = Executors.newFixedThreadPool(1, r -> {
76+
Thread thread = new Thread(r);
77+
thread.setName(name + "-boss");
78+
return thread;
79+
});
80+
}
81+
82+
public static TcpReverseProxy create(String targetHost, int targetPort) {
83+
return new TcpReverseProxy(targetHost, targetPort, generateName());
84+
}
85+
86+
public static TcpReverseProxy create(String targetHost, int targetPort, String name) {
87+
return new TcpReverseProxy(targetHost, targetPort, name);
88+
}
89+
90+
public TcpReverseProxy workerExecutor(ExecutorService workerExecutor) {
91+
this.workerExecutor = workerExecutor;
92+
return this;
93+
}
94+
95+
public TcpReverseProxy maxConnections(int maxConnections) {
96+
this.maxConnections = maxConnections;
97+
return this;
98+
}
99+
100+
public TcpReverseProxy bufferSize(int bufferSize) {
101+
this.bufferSize = bufferSize;
102+
return this;
103+
}
104+
105+
public TcpReverseProxy host(String host) {
106+
this.sourceHost = host;
107+
return this;
108+
}
109+
110+
public TcpReverseProxy port(int port) {
111+
this.sourcePort = port;
112+
return this;
113+
}
114+
115+
public void start() {
116+
bossExecutor.submit(() -> {
117+
try {
118+
serverSocket = new ServerSocket(sourcePort, 0, InetAddress.getByName(sourceHost));
119+
log.info("{} started {}:{} <--> {}:{}", name, sourceHost, sourcePort, targetHost, targetPort);
120+
while (!serverSocket.isClosed()) {
121+
try {
122+
/**
123+
* 将sourceSocket获取的内容,写入到targetSocket
124+
* 然后将targetSocket写出的内容,转写给sourceSocket
125+
*/
126+
Socket sourceSocket = serverSocket.accept();
127+
if (connections.size() >= maxConnections) {
128+
sourceSocket.close();
129+
continue;
130+
}
131+
Socket targetSocket = new Socket(targetHost, targetPort);
132+
connections.put(sourceSocket, targetSocket);
133+
log.info("{} connected, proxy to {}", sourceSocket.getRemoteSocketAddress().toString(), targetSocket.getRemoteSocketAddress().toString());
134+
// 监听源端主动传入的消息写给目标端
135+
workerExecutor.execute(() -> {
136+
// isClosed只能监听本地连接状态。若远端关闭或者网络问题,无法监听到。
137+
if (sourceSocket.isClosed() || targetSocket.isClosed()) {
138+
return;
139+
}
140+
try (InputStream sourceIS = sourceSocket.getInputStream();
141+
OutputStream targetOS = targetSocket.getOutputStream()) {
142+
byte[] buffer = new byte[bufferSize];
143+
int len;
144+
while ((len = sourceIS.read(buffer)) != -1) {
145+
targetOS.write(buffer, 0, len);
146+
log.debug("transferred {} --> {}", sourceSocket, targetSocket);
147+
}
148+
targetOS.flush();
149+
} catch (Exception ignore) {
150+
} finally {
151+
try {
152+
log.info("closed {} <--> {}", sourceSocket, targetSocket);
153+
sourceSocket.close();
154+
targetSocket.close();
155+
connections.remove(sourceSocket);
156+
} catch (Exception ignore) {
157+
}
158+
}
159+
});
160+
// 监听目标端主动写回的消息写回源端
161+
workerExecutor.execute(() -> {
162+
if (sourceSocket.isClosed() || targetSocket.isClosed()) {
163+
return;
164+
}
165+
try (OutputStream sourceOS = sourceSocket.getOutputStream();
166+
InputStream targetIS = targetSocket.getInputStream()) {
167+
byte[] buffer = new byte[bufferSize];
168+
int len;
169+
while ((len = targetIS.read(buffer)) != -1) {
170+
sourceOS.write(buffer, 0, len);
171+
log.debug("transferred {} <-- {}", sourceSocket, targetSocket);
172+
}
173+
sourceOS.flush();
174+
} catch (Exception ignore) {
175+
} finally {
176+
try {
177+
log.info("closed {} <--> {}", sourceSocket, targetSocket);
178+
sourceSocket.close();
179+
targetSocket.close();
180+
connections.remove(sourceSocket);
181+
} catch (Exception ignore) {
182+
}
183+
}
184+
});
185+
} catch (Exception e) {
186+
log.error("error", e);
187+
}
188+
}
189+
} catch (Exception e) {
190+
log.error("{} start error", name, e);
191+
}
192+
});
193+
}
194+
195+
196+
public void stop() {
197+
if (serverSocket != null) {
198+
try {
199+
serverSocket.close();
200+
bossExecutor.shutdownNow();
201+
} catch (Exception ignore) {
202+
}
203+
log.info("{} stoped", name);
204+
}
205+
}
206+
207+
private static String generateName() {
208+
final String prefix = "TCPReverseProxy-";
209+
try {
210+
// 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠
211+
synchronized (System.getProperties()) {
212+
final String next = String.valueOf(Integer.getInteger("top.meethigher.TcpReverseProxy.name", 0) + 1);
213+
System.setProperty("top.meethigher.TcpReverseProxy.name", next);
214+
return prefix + next;
215+
}
216+
} catch (Exception e) {
217+
final ThreadLocalRandom random = ThreadLocalRandom.current();
218+
final StringBuilder sb = new StringBuilder(prefix);
219+
for (int i = 0; i < 4; i++) {
220+
sb.append(ID_CHARACTERS[random.nextInt(62)]);
221+
}
222+
return sb.toString();
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)