Skip to content

Commit 19236b0

Browse files
committed
* sse: removed default rate limit group ServerSentEventConfig.SSE_CONNECT_GROUP ("sse:connect")
* sse: support @limitrate on sse connect or class Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
1 parent a136fa2 commit 19236b0

File tree

13 files changed

+92
-55
lines changed

13 files changed

+92
-55
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
## Change log
22

3+
### 9.1.8 (3/7/2025 - )
4+
5+
* sse: removed default rate limit group ServerSentEventConfig.SSE_CONNECT_GROUP ("sse:connect")
6+
* sse: support @LimitRate on sse connect or class
7+
> use @LimitRate and http().limitRate().add() to configure rate limit
8+
39
### 9.1.7 (2/26/2025 - 3/6/2025)
410

511
* sse: send ErrorResponse to client via "event: error" on exception

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ apply(plugin = "project")
77

88
subprojects {
99
group = "core.framework"
10-
version = "9.1.7"
10+
version = "9.1.8"
1111
}
1212

1313
val elasticVersion = "8.15.0"

core-ng/src/main/java/core/framework/internal/web/HTTPHandlerContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public class HTTPHandlerContext {
1313
public final RequestBeanReader requestBeanReader = new RequestBeanReader();
1414
public final ResponseBeanWriter responseBeanWriter = new ResponseBeanWriter();
1515
public final RateControl rateControl = new RateControl();
16+
public boolean limitRate;
1617
@Nullable
1718
public IPv4AccessControl accessControl;
1819
}

core-ng/src/main/java/core/framework/internal/web/HTTPIOHandler.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
7070
HttpString method = exchange.getRequestMethod();
7171
HeaderMap headers = exchange.getRequestHeaders();
7272

73-
var requestHandler = new HTTPRequestHandler(exchange, handler, sseHandler);
73+
var requestHandler = new Handler(exchange);
7474
boolean ws = webSocketHandler != null && webSocketHandler.check(method, headers); // TODO: retire ws and simplify
7575
boolean active = !requestHandler.sse && !ws;
7676
boolean shutdown = shutdownHandler.handle(exchange, active);
@@ -123,4 +123,26 @@ boolean hasBody(long contentLength, HttpString method) {
123123
if (contentLength == 0) return false; // if body is empty, skip reading
124124
return Methods.POST.equals(method) || Methods.PUT.equals(method) || Methods.PATCH.equals(method);
125125
}
126+
127+
public class Handler {
128+
private final boolean sse;
129+
private final HttpServerExchange exchange;
130+
131+
Handler(HttpServerExchange exchange) {
132+
this.exchange = exchange;
133+
134+
HttpString method = exchange.getRequestMethod();
135+
String path = exchange.getRequestPath();
136+
HeaderMap headers = exchange.getRequestHeaders();
137+
sse = sseHandler != null && sseHandler.check(method, path, headers);
138+
}
139+
140+
public void handle() {
141+
if (sse) {
142+
sseHandler.handleRequest(exchange); // not dispatch, continue in io thread
143+
} else {
144+
exchange.dispatch(handler);
145+
}
146+
}
147+
}
126148
}

core-ng/src/main/java/core/framework/internal/web/HTTPRequestHandler.java

Lines changed: 0 additions & 32 deletions
This file was deleted.

core-ng/src/main/java/core/framework/internal/web/request/RequestBodyReader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package core.framework.internal.web.request;
22

3-
import core.framework.internal.web.HTTPRequestHandler;
3+
import core.framework.internal.web.HTTPIOHandler;
44
import core.framework.web.exception.BadRequestException;
55
import io.undertow.connector.PooledByteBuffer;
66
import io.undertow.server.HttpServerExchange;
@@ -20,13 +20,13 @@ public final class RequestBodyReader implements ChannelListener<StreamSourceChan
2020
static final AttachmentKey<RequestBody> REQUEST_BODY = AttachmentKey.create(RequestBody.class);
2121

2222
private final HttpServerExchange exchange;
23-
private final HTTPRequestHandler handler;
23+
private final HTTPIOHandler.Handler handler;
2424
private final int contentLength;
2525
private boolean complete;
2626
private byte[] body;
2727
private int position = 0;
2828

29-
public RequestBodyReader(HttpServerExchange exchange, HTTPRequestHandler handler) {
29+
public RequestBodyReader(HttpServerExchange exchange, HTTPIOHandler.Handler handler) {
3030
this.exchange = exchange;
3131
this.handler = handler;
3232
contentLength = (int) exchange.getRequestContentLength();
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,33 @@
11
package core.framework.internal.web.sse;
22

3+
import core.framework.web.Request;
4+
import core.framework.web.sse.Channel;
35
import core.framework.web.sse.ChannelListener;
46

7+
import java.lang.annotation.Annotation;
8+
import java.lang.reflect.Method;
9+
510
class ChannelSupport<T> {
611
final ChannelListener<T> listener;
712
final ServerSentEventContextImpl<T> context;
813
final ServerSentEventBuilder<T> builder;
14+
final Method targetMethod;
915

1016
ChannelSupport(ChannelListener<T> listener, Class<T> eventClass, ServerSentEventContextImpl<T> context) {
1117
this.listener = listener;
1218
this.context = context;
1319
builder = new ServerSentEventBuilder<>(eventClass);
20+
try {
21+
this.targetMethod = listener.getClass().getMethod("onConnect", Request.class, Channel.class, String.class);
22+
} catch (NoSuchMethodException e) {
23+
throw new Error("failed to get listener.onConnect method", e);
24+
}
25+
}
26+
27+
<V extends Annotation> V annotation(Class<V> annotationClass) {
28+
V annotation = targetMethod.getDeclaredAnnotation(annotationClass);
29+
if (annotation == null)
30+
annotation = listener.getClass().getDeclaredAnnotation(annotationClass);
31+
return annotation;
1432
}
1533
}

core-ng/src/main/java/core/framework/internal/web/sse/ServerSentEventHandler.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
import core.framework.internal.log.ActionLog;
66
import core.framework.internal.log.LogManager;
77
import core.framework.internal.web.HTTPHandlerContext;
8+
import core.framework.internal.web.http.RateControl;
89
import core.framework.internal.web.request.RequestImpl;
910
import core.framework.internal.web.service.ErrorResponse;
1011
import core.framework.internal.web.session.ReadOnlySession;
1112
import core.framework.internal.web.session.SessionManager;
12-
import core.framework.module.ServerSentEventConfig;
1313
import core.framework.util.Strings;
14+
import core.framework.web.rate.LimitRate;
1415
import core.framework.web.sse.ChannelListener;
1516
import io.undertow.server.HttpHandler;
1617
import io.undertow.server.HttpServerExchange;
@@ -95,7 +96,10 @@ void handle(HttpServerExchange exchange, StreamSinkChannel sink) {
9596
@SuppressWarnings("unchecked")
9697
ChannelSupport<Object> support = (ChannelSupport<Object>) supports.get(key(request.method().name(), path)); // ServerSentEventHandler.check() ensures path exists
9798
actionLog.action("sse:" + path + ":connect");
98-
handlerContext.rateControl.validateRate(ServerSentEventConfig.SSE_CONNECT_GROUP, request.clientIP());
99+
100+
if (handlerContext.limitRate) {
101+
limitRate(handlerContext.rateControl, support, request.clientIP());
102+
}
99103

100104
channel = new ChannelImpl<>(exchange, sink, support.context, support.builder, actionLog.id);
101105
actionLog.context("channel", channel.id);
@@ -132,6 +136,14 @@ void handle(HttpServerExchange exchange, StreamSinkChannel sink) {
132136
}
133137
}
134138

139+
void limitRate(RateControl rateControl, ChannelSupport<Object> support, String clientIP) {
140+
LimitRate limitRate = support.annotation(LimitRate.class);
141+
if (limitRate != null) {
142+
String group = limitRate.value();
143+
rateControl.validateRate(group, clientIP);
144+
}
145+
}
146+
135147
byte[] errorResponse(byte[] errorResponse) {
136148
ByteBuffer buffer = ByteBuffer.wrap(new byte[errorResponse.length + 38]);
137149
buffer.put(Strings.bytes("retry: 86400000\n\nevent: error\ndata: ")); // tell browser retry in 24 hours

core-ng/src/main/java/core/framework/module/LimitRateConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ protected void initialize(ModuleContext context, String name) {
2020
// save at max 5K group/ip combination per pod, about 800K memory, to adapt with more ips/cc attack, better defense with cloud infra based solution together
2121
maxEntries(5000);
2222
context.httpServerConfig.interceptors.add(new LimitRateInterceptor(rateControl));
23+
context.httpServer.handlerContext.limitRate = true;
2324
}
2425

2526
@Override

core-ng/src/main/java/core/framework/module/ServerSentEventConfig.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
import java.time.Duration;
1919

2020
public class ServerSentEventConfig extends Config {
21-
// use http().limitRate().add(ServerSentEventConfig.SSE_CONNECT_GROUP, ...) to configure rate limiting for sse connections
22-
public static final String SSE_CONNECT_GROUP = "sse:connect";
23-
2421
private final Logger logger = LoggerFactory.getLogger(ServerSentEventConfig.class);
2522

2623
ModuleContext context;
@@ -31,13 +28,6 @@ protected void initialize(ModuleContext context, @Nullable String name) {
3128
this.context = context;
3229
}
3330

34-
@Override
35-
protected void validate() {
36-
if (!context.httpServer.handlerContext.rateControl.hasGroup(SSE_CONNECT_GROUP)) {
37-
context.httpServer.handlerContext.rateControl.config(SSE_CONNECT_GROUP, 10, 10, Duration.ofSeconds(30));
38-
}
39-
}
40-
4131
public <T> void listen(HTTPMethod method, String path, Class<T> eventClass, ChannelListener<T> listener) {
4232
if (HTTPIOHandler.HEALTH_CHECK_PATH.equals(path)) throw new Error("/health-check is reserved path");
4333
if (path.contains("/:")) throw new Error("listener path must be static, path=" + path);

0 commit comments

Comments
 (0)