|
| 1 | +package top.meethigher; |
| 2 | + |
| 3 | +import io.vertx.core.AsyncResult; |
| 4 | +import io.vertx.core.Future; |
| 5 | +import io.vertx.core.Handler; |
| 6 | +import io.vertx.core.Vertx; |
| 7 | +import io.vertx.core.net.NetClient; |
| 8 | +import io.vertx.core.net.NetServer; |
| 9 | +import io.vertx.core.net.NetSocket; |
| 10 | +import org.slf4j.Logger; |
| 11 | +import org.slf4j.LoggerFactory; |
| 12 | + |
| 13 | +import java.util.concurrent.ThreadLocalRandom; |
| 14 | + |
| 15 | + |
| 16 | +public class VertxTCPReverseProxy { |
| 17 | + |
| 18 | + private static final Logger log = LoggerFactory.getLogger(VertxTCPReverseProxy.class); |
| 19 | + |
| 20 | + private static final char[] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".toCharArray(); |
| 21 | + |
| 22 | + private String sourceHost = "0.0.0.0"; |
| 23 | + |
| 24 | + private int sourcePort = 999; |
| 25 | + |
| 26 | + private final NetServer netServer; |
| 27 | + |
| 28 | + private final NetClient netClient; |
| 29 | + |
| 30 | + private final String targetHost; |
| 31 | + |
| 32 | + private final int targetPort; |
| 33 | + |
| 34 | + private final String name; |
| 35 | + |
| 36 | + private VertxTCPReverseProxy(NetServer netServer, NetClient netClient, |
| 37 | + String targetHost, int targetPort, String name) { |
| 38 | + this.name = name; |
| 39 | + this.targetHost = targetHost; |
| 40 | + this.targetPort = targetPort; |
| 41 | + this.netServer = netServer; |
| 42 | + this.netClient = netClient; |
| 43 | + } |
| 44 | + |
| 45 | + public static VertxTCPReverseProxy create(Vertx vertx, |
| 46 | + String targetHost, int targetPort, String name) { |
| 47 | + return new VertxTCPReverseProxy(vertx.createNetServer(), vertx.createNetClient(), targetHost, targetPort, name); |
| 48 | + } |
| 49 | + |
| 50 | + public static VertxTCPReverseProxy create(Vertx vertx, |
| 51 | + String targetHost, int targetPort) { |
| 52 | + return new VertxTCPReverseProxy(vertx.createNetServer(), vertx.createNetClient(), targetHost, targetPort, generateName()); |
| 53 | + } |
| 54 | + |
| 55 | + public VertxTCPReverseProxy port(int port) { |
| 56 | + this.sourcePort = port; |
| 57 | + return this; |
| 58 | + } |
| 59 | + |
| 60 | + public VertxTCPReverseProxy host(String host) { |
| 61 | + this.sourceHost = host; |
| 62 | + return this; |
| 63 | + } |
| 64 | + |
| 65 | + |
| 66 | + private static String generateName() { |
| 67 | + final String prefix = "VertxTCPReverseProxy-"; |
| 68 | + try { |
| 69 | + // 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠 |
| 70 | + synchronized (System.getProperties()) { |
| 71 | + final String next = String.valueOf(Integer.getInteger("top.meethigher.VertxTCPReverseProxy.name", 0) + 1); |
| 72 | + System.setProperty("top.meethigher.VertxTCPReverseProxy.name", next); |
| 73 | + return prefix + next; |
| 74 | + } |
| 75 | + } catch (Exception e) { |
| 76 | + final ThreadLocalRandom random = ThreadLocalRandom.current(); |
| 77 | + final StringBuilder sb = new StringBuilder(prefix); |
| 78 | + for (int i = 0; i < 4; i++) { |
| 79 | + sb.append(ID_CHARACTERS[random.nextInt(62)]); |
| 80 | + } |
| 81 | + return sb.toString(); |
| 82 | + } |
| 83 | + } |
| 84 | + |
| 85 | + public void start() { |
| 86 | + Handler<NetSocket> connectHandler = sourceSocket -> { |
| 87 | + sourceSocket.pause(); |
| 88 | + netClient.connect(targetPort, targetHost) |
| 89 | + .onSuccess(targetSocket -> { |
| 90 | + log.info("{} connected, proxy to {}", sourceSocket.remoteAddress().toString(), targetSocket.remoteAddress().toString()); |
| 91 | + targetSocket.pause(); |
| 92 | + sourceSocket.pipeTo(targetSocket); |
| 93 | + targetSocket.closeHandler(v -> sourceSocket.close()).pipeTo(sourceSocket); |
| 94 | + sourceSocket.resume(); |
| 95 | + targetSocket.resume(); |
| 96 | + }) |
| 97 | + .onFailure(e -> log.error("failed to connect to {}:{}", targetHost, targetPort, e)); |
| 98 | + |
| 99 | + }; |
| 100 | + Handler<Throwable> connectFailedHandler = e -> log.error("connect failed", e); |
| 101 | + Handler<AsyncResult<NetServer>> asyncResultHandler = ar -> { |
| 102 | + if (ar.succeeded()) { |
| 103 | + log.info("{} started on {}:{}", name, sourceHost, sourcePort); |
| 104 | + } else { |
| 105 | + Throwable e = ar.cause(); |
| 106 | + log.error("{} start failed", name, e); |
| 107 | + } |
| 108 | + }; |
| 109 | + netServer.connectHandler(connectHandler).exceptionHandler(connectFailedHandler); |
| 110 | + Future<NetServer> listen = sourceHost == null ? netServer.listen(sourcePort) : netServer.listen(sourcePort, sourceHost); |
| 111 | + listen.onComplete(asyncResultHandler); |
| 112 | + } |
| 113 | + |
| 114 | + public void stop() { |
| 115 | + netServer.close() |
| 116 | + .onSuccess(v -> log.info("{} closed", name)) |
| 117 | + .onFailure(e -> log.error("{} close failed", name, e)); |
| 118 | + } |
| 119 | + |
| 120 | +} |
0 commit comments