Skip to content

Commit 2c21601

Browse files
committed
feat: ReverseHttpProxy日志调整
1 parent d12cfbb commit 2c21601

File tree

2 files changed

+182
-21
lines changed

2 files changed

+182
-21
lines changed

src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -289,12 +289,12 @@ protected Object getRouteMetadata(Route route, String key) {
289289
* @param value 数据值
290290
*/
291291
protected void setContextData(RoutingContext ctx, String key, Object value) {
292-
ctx.put(key, value == null ? "" : value);
292+
ctx.put(key, value == null ? "null" : value);
293293
}
294294

295295
protected Object getContextData(RoutingContext ctx, String key) {
296296
Object metadata = ctx.get(key);
297-
return metadata == null ? "" : metadata;
297+
return metadata == null ? "null" : metadata;
298298
}
299299

300300
protected HttpServerResponse setStatusCode(RoutingContext ctx, HttpServerResponse resp, int code) {
@@ -630,14 +630,14 @@ protected Handler<AsyncResult<HttpClientRequest>> connectHandler(RoutingContext
630630
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, true);
631631

632632
// 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
633-
setContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR, clientReq.connection().localAddress().toString());
634-
setContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR, clientReq.connection().remoteAddress().toString());
635-
log.debug("{} --> {} connected", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
636-
633+
HttpConnection connection = clientReq.connection();
634+
setContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR, connection.localAddress().toString());
635+
setContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR, connection.remoteAddress().toString());
636+
log.debug("target {} -- {} connected", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
637637

638-
clientReq.connection().closeHandler(v -> {
638+
connection.closeHandler(v -> {
639639
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, false);
640-
log.debug("{} --> {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
640+
log.debug("target {} -- {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
641641
});
642642

643643

@@ -679,6 +679,15 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
679679
// 暂停流读取
680680
ctx.request().pause();
681681

682+
HttpConnection connection = ctx.request().connection();
683+
setContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR, connection.remoteAddress().toString());
684+
setContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR, connection.localAddress().toString());
685+
// 记录请求开始时间
686+
setContextData(ctx, INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis());
687+
// 记录连接状态
688+
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, true);
689+
log.debug("source {} -- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
690+
682691
// vertx的uri()是包含query参数的。而path()才是我们常说的不带有query的uri
683692
// route不是线程安全的。route里的metadata应以路由为单元存储,而不是以请求为单元存储。一个路由会有很多请求。
684693
// 若想要以请求为单元存储数据,应该使用routingContext.put
@@ -687,11 +696,6 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
687696
setContextData(ctx, key, ctx.currentRoute().getMetadata(key));
688697
}
689698

690-
// 记录请求开始时间
691-
setContextData(ctx, INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis());
692-
// 记录连接状态
693-
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, true);
694-
695699
// 获取代理地址
696700
String proxyUrl = getProxyUrl(ctx, ctx.request(), ctx.response());
697701
setContextData(ctx, INTERNAL_PROXY_URL, proxyUrl);
@@ -707,15 +711,9 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
707711
requestOptions.setMethod(ctx.request().method());
708712
requestOptions.setFollowRedirects(getContextData(ctx, P_FOLLOW_REDIRECTS) != null && Boolean.parseBoolean(getContextData(ctx, P_FOLLOW_REDIRECTS).toString()));
709713

710-
// 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
711-
setContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR, ctx.request().connection().remoteAddress().toString());
712-
setContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR, ctx.request().connection().localAddress().toString());
713-
714-
log.debug("{} <-- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
715-
716-
ctx.request().connection().closeHandler(v -> {
714+
connection.closeHandler(v -> {
717715
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, false);
718-
log.debug("{} <-- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
716+
log.debug("source {} -- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
719717
});
720718

721719
// 如果跨域由代理服务接管,那么针对跨域使用的OPTIONS预检请求,就由代理服务接管,而不经过实际的后端服务
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package top.meethigher.proxy.tcp;
12+
13+
import io.vertx.core.*;
14+
import io.vertx.core.net.NetSocket;
15+
import io.vertx.core.streams.ReadStream;
16+
import io.vertx.core.streams.WriteStream;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
public class Pipe<T> implements io.vertx.core.streams.Pipe<T> {
21+
22+
private static final Logger log = LoggerFactory.getLogger(Pipe.class);
23+
private final Promise<Void> result;
24+
private final ReadStream<T> src;
25+
private boolean endOnSuccess = true;
26+
private boolean endOnFailure = true;
27+
private WriteStream<T> dst;
28+
29+
public Pipe(ReadStream<T> src) {
30+
this.src = src;
31+
this.result = Promise.promise();
32+
33+
if (src instanceof NetSocket) {
34+
NetSocket tsrc = (NetSocket) src;
35+
tsrc.remoteAddress();
36+
tsrc.localAddress();
37+
}
38+
39+
// Set handlers now
40+
src.endHandler(result::tryComplete);
41+
src.exceptionHandler(result::tryFail);
42+
}
43+
44+
@Override
45+
public synchronized io.vertx.core.streams.Pipe<T> endOnFailure(boolean end) {
46+
endOnFailure = end;
47+
return this;
48+
}
49+
50+
@Override
51+
public synchronized io.vertx.core.streams.Pipe<T> endOnSuccess(boolean end) {
52+
endOnSuccess = end;
53+
return this;
54+
}
55+
56+
@Override
57+
public synchronized io.vertx.core.streams.Pipe<T> endOnComplete(boolean end) {
58+
endOnSuccess = end;
59+
endOnFailure = end;
60+
return this;
61+
}
62+
63+
private void handleWriteResult(AsyncResult<Void> ack) {
64+
if (ack.failed()) {
65+
result.tryFail(new WriteException(ack.cause()));
66+
}
67+
}
68+
69+
@Override
70+
public void to(WriteStream<T> ws, Handler<AsyncResult<Void>> completionHandler) {
71+
if (ws == null) {
72+
throw new NullPointerException();
73+
}
74+
boolean endOnSuccess;
75+
boolean endOnFailure;
76+
synchronized (Pipe.this) {
77+
if (dst != null) {
78+
throw new IllegalStateException();
79+
}
80+
dst = ws;
81+
endOnSuccess = this.endOnSuccess;
82+
endOnFailure = this.endOnFailure;
83+
}
84+
Handler<Void> drainHandler = v -> src.resume();
85+
src.handler(item -> {
86+
if (src instanceof NetSocket) {
87+
NetSocket tsrc = (NetSocket) src;
88+
log.info("{} -- {} received:\n{}", tsrc.remoteAddress(), tsrc.localAddress(),
89+
item.toString());
90+
}
91+
ws.write(item, this::handleWriteResult);
92+
if (ws.writeQueueFull()) {
93+
src.pause();
94+
ws.drainHandler(drainHandler);
95+
}
96+
});
97+
src.resume();
98+
result.future().onComplete(ar -> {
99+
try {
100+
src.handler(null);
101+
} catch (Exception ignore) {
102+
}
103+
try {
104+
src.exceptionHandler(null);
105+
} catch (Exception ignore) {
106+
}
107+
try {
108+
src.endHandler(null);
109+
} catch (Exception ignore) {
110+
}
111+
if (ar.succeeded()) {
112+
handleSuccess(completionHandler);
113+
} else {
114+
Throwable err = ar.cause();
115+
if (err instanceof WriteException) {
116+
src.resume();
117+
err = err.getCause();
118+
}
119+
handleFailure(err, completionHandler);
120+
}
121+
});
122+
}
123+
124+
private void handleSuccess(Handler<AsyncResult<Void>> completionHandler) {
125+
if (endOnSuccess) {
126+
dst.end(completionHandler);
127+
} else {
128+
completionHandler.handle(Future.succeededFuture());
129+
}
130+
}
131+
132+
private void handleFailure(Throwable cause, Handler<AsyncResult<Void>> completionHandler) {
133+
Future<Void> res = Future.failedFuture(cause);
134+
if (endOnFailure) {
135+
dst.end(ignore -> {
136+
completionHandler.handle(res);
137+
});
138+
} else {
139+
completionHandler.handle(res);
140+
}
141+
}
142+
143+
public void close() {
144+
synchronized (this) {
145+
src.exceptionHandler(null);
146+
src.handler(null);
147+
if (dst != null) {
148+
dst.drainHandler(null);
149+
dst.exceptionHandler(null);
150+
}
151+
}
152+
VertxException err = new VertxException("Pipe closed", true);
153+
if (result.tryFail(err)) {
154+
src.resume();
155+
}
156+
}
157+
158+
private static class WriteException extends VertxException {
159+
private WriteException(Throwable cause) {
160+
super(cause, true);
161+
}
162+
}
163+
}

0 commit comments

Comments
 (0)