Skip to content

Commit 540f1f2

Browse files
committed
feat: 支持自定义线程池
1 parent ef9a8c0 commit 540f1f2

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

src/main/java/top/meethigher/TCPReverseProxy.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
import java.net.InetAddress;
1010
import java.net.ServerSocket;
1111
import java.net.Socket;
12-
import java.util.concurrent.CompletableFuture;
1312
import java.util.concurrent.ExecutorService;
1413
import java.util.concurrent.Executors;
14+
import java.util.concurrent.ForkJoinPool;
1515

1616
public class TCPReverseProxy {
1717

@@ -29,24 +29,35 @@ public class TCPReverseProxy {
2929

3030
private ServerSocket serverSocket;
3131

32-
private final ExecutorService executor;
32+
private final ExecutorService bossExecutor = Executors.newFixedThreadPool(1);
33+
34+
private final ExecutorService workerExecutor;
35+
36+
37+
public TCPReverseProxy(String sourceHost, int sourcePort, String targetHost, int targetPort, ExecutorService workerExecutor) {
38+
this.sourceHost = sourceHost;
39+
this.sourcePort = sourcePort;
40+
this.targetHost = targetHost;
41+
this.targetPort = targetPort;
42+
this.workerExecutor = workerExecutor;
43+
}
3344

3445
public TCPReverseProxy(String sourceHost, int sourcePort, String targetHost, int targetPort) {
3546
this.sourceHost = sourceHost;
3647
this.sourcePort = sourcePort;
3748
this.targetHost = targetHost;
3849
this.targetPort = targetPort;
39-
executor = Executors.newFixedThreadPool(1);
50+
workerExecutor = ForkJoinPool.commonPool();
4051
}
4152

4253
public void start() {
43-
executor.submit(() -> {
54+
bossExecutor.submit(() -> {
4455
try {
4556
serverSocket = new ServerSocket(sourcePort, 0, InetAddress.getByName(sourceHost));
4657
log.info("proxy server started {}:{} <--> {}:{}", sourceHost, sourcePort, targetHost, targetPort);
4758
while (!serverSocket.isClosed()) {
4859
Socket sourceSocket = serverSocket.accept();
49-
CompletableFuture.runAsync(() -> {
60+
workerExecutor.execute(() -> {
5061
try {
5162
/**
5263
* 将sourceSocket获取的内容,写入到targetSocket
@@ -55,7 +66,7 @@ public void start() {
5566
Socket targetSocket = new Socket(targetHost, targetPort);
5667
log.info("{} connected, proxy to {}", sourceSocket.getRemoteSocketAddress().toString(), targetSocket.getRemoteSocketAddress().toString());
5768
// 监听源端主动传入的消息写给目标端
58-
CompletableFuture.runAsync(() -> {
69+
workerExecutor.execute(() -> {
5970
// isClosed只能监听本地连接状态。若远端关闭或者网络问题,无法监听到。
6071
if (sourceSocket.isClosed() || targetSocket.isClosed()) {
6172
return;
@@ -80,7 +91,7 @@ public void start() {
8091
}
8192
});
8293
// 监听目标端主动写回的消息写回源端
83-
CompletableFuture.runAsync(() -> {
94+
workerExecutor.execute(() -> {
8495
if (sourceSocket.isClosed() || targetSocket.isClosed()) {
8596
return;
8697
}
@@ -122,7 +133,7 @@ public void stop() {
122133
} catch (Exception ignore) {
123134
}
124135
}
125-
executor.shutdown();
136+
bossExecutor.shutdown();
126137
log.info("proxy server stoped {}:{} <--> {}:{}", sourceHost, sourcePort, targetHost, targetPort);
127138
}
128139

src/test/java/top/meethigher/TCPReverseProxyTest.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,37 @@
22

33
import org.junit.Test;
44

5+
import java.util.concurrent.LinkedBlockingQueue;
6+
import java.util.concurrent.ThreadPoolExecutor;
57
import java.util.concurrent.TimeUnit;
68

79
public class TCPReverseProxyTest {
810

911

1012
@Test
1113
public void testSSH() throws Exception {
12-
TCPReverseProxy localhost = new TCPReverseProxy("localhost", 11, "10.0.0.9", 22);
14+
TCPReverseProxy localhost = new TCPReverseProxy("127.0.0.1", 11, "10.0.0.9", 22);
1315
localhost.start();
14-
TimeUnit.HOURS.sleep(1);
16+
TimeUnit.MINUTES.sleep(1);
1517
localhost.stop();
1618
}
1719

1820

1921
@Test
2022
public void testPostgreSQL() throws Exception {
21-
TCPReverseProxy localhost = new TCPReverseProxy("localhost", 22, "10.0.0.9", 5432);
23+
TCPReverseProxy localhost = new TCPReverseProxy("0.0.0.0", 22,
24+
"10.0.0.9", 5432,
25+
new ThreadPoolExecutor(50, 50, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()));
2226
localhost.start();
23-
TimeUnit.HOURS.sleep(1);
27+
TimeUnit.MINUTES.sleep(1);
2428
localhost.stop();
2529
}
2630

2731
@Test
2832
public void testFTP() throws Exception {
29-
TCPReverseProxy localhost = new TCPReverseProxy("localhost", 33, "localhost", 66);
33+
TCPReverseProxy localhost = new TCPReverseProxy("127.0.0.1", 33, "127.0.0.1", 66);
3034
localhost.start();
31-
TimeUnit.HOURS.sleep(1);
35+
TimeUnit.MINUTES.sleep(1);
3236
localhost.stop();
3337
}
3438
}

0 commit comments

Comments
 (0)