Skip to content

Commit 6d5af60

Browse files
committed
Expose DefaultSubscriptionRegistry's cache limit through SimpleBrokerMessageHandler and MessageBrokerRegistry
Issue: SPR-14516
1 parent 06edd23 commit 6d5af60

File tree

5 files changed

+110
-47
lines changed

5 files changed

+110
-47
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* in memory and uses a {@link org.springframework.util.PathMatcher PathMatcher}
5151
* for matching destinations.
5252
*
53-
* <p>As of 4.2 this class supports a {@link #setSelectorHeaderName selector}
53+
* <p>As of 4.2, this class supports a {@link #setSelectorHeaderName selector}
5454
* header on subscription messages with Spring EL expressions evaluated against
5555
* the headers to filter out messages in addition to destination matching.
5656
*
@@ -65,11 +65,10 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
6565
public static final int DEFAULT_CACHE_LIMIT = 1024;
6666

6767

68-
/** The maximum number of entries in the cache */
69-
private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
70-
7168
private PathMatcher pathMatcher = new AntPathMatcher();
7269

70+
private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
71+
7372
private String selectorHeaderName = "selector";
7473

7574
private volatile boolean selectorHeaderInUse = false;
@@ -82,32 +81,32 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
8281

8382

8483
/**
85-
* Specify the maximum number of entries for the resolved destination cache.
86-
* Default is 1024.
84+
* Specify the {@link PathMatcher} to use.
8785
*/
88-
public void setCacheLimit(int cacheLimit) {
89-
this.cacheLimit = cacheLimit;
86+
public void setPathMatcher(PathMatcher pathMatcher) {
87+
this.pathMatcher = pathMatcher;
9088
}
9189

9290
/**
93-
* Return the maximum number of entries for the resolved destination cache.
91+
* Return the configured {@link PathMatcher}.
9492
*/
95-
public int getCacheLimit() {
96-
return this.cacheLimit;
93+
public PathMatcher getPathMatcher() {
94+
return this.pathMatcher;
9795
}
9896

9997
/**
100-
* Specify the {@link PathMatcher} to use.
98+
* Specify the maximum number of entries for the resolved destination cache.
99+
* Default is 1024.
101100
*/
102-
public void setPathMatcher(PathMatcher pathMatcher) {
103-
this.pathMatcher = pathMatcher;
101+
public void setCacheLimit(int cacheLimit) {
102+
this.cacheLimit = cacheLimit;
104103
}
105104

106105
/**
107-
* Return the configured {@link PathMatcher}.
106+
* Return the maximum number of entries for the resolved destination cache.
108107
*/
109-
public PathMatcher getPathMatcher() {
110-
return this.pathMatcher;
108+
public int getCacheLimit() {
109+
return this.cacheLimit;
111110
}
112111

113112
/**
@@ -123,12 +122,13 @@ public PathMatcher getPathMatcher() {
123122
* @since 4.2
124123
*/
125124
public void setSelectorHeaderName(String selectorHeaderName) {
126-
Assert.notNull(selectorHeaderName);
125+
Assert.notNull(selectorHeaderName, "'selectorHeaderName' must not be null");
127126
this.selectorHeaderName = selectorHeaderName;
128127
}
129128

130129
/**
131130
* Return the name for the selector header.
131+
* @since 4.2
132132
*/
133133
public String getSelectorHeaderName() {
134134
return this.selectorHeaderName;

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* {@link SubscriptionRegistry} and sends messages to subscribers.
4343
*
4444
* @author Rossen Stoyanchev
45+
* @author Juergen Hoeller
4546
* @since 4.0
4647
*/
4748
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
@@ -54,6 +55,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
5455

5556
private PathMatcher pathMatcher;
5657

58+
private Integer cacheLimit;
59+
5760
private TaskScheduler taskScheduler;
5861

5962
private long[] heartbeatValue;
@@ -90,29 +93,54 @@ public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
9093
Assert.notNull(subscriptionRegistry, "SubscriptionRegistry must not be null");
9194
this.subscriptionRegistry = subscriptionRegistry;
9295
initPathMatcherToUse();
93-
}
94-
95-
private void initPathMatcherToUse() {
96-
if (this.pathMatcher != null) {
97-
if (this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
98-
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setPathMatcher(this.pathMatcher);
99-
}
100-
}
96+
initCacheLimitToUse();
10197
}
10298

10399
public SubscriptionRegistry getSubscriptionRegistry() {
104100
return this.subscriptionRegistry;
105101
}
106102

107103
/**
108-
* When configured, the given PathMatcher is passed down to the
104+
* When configured, the given PathMatcher is passed down to the underlying
109105
* SubscriptionRegistry to use for matching destination to subscriptions.
106+
* <p>Default is a standard {@link org.springframework.util.AntPathMatcher}.
107+
* @since 4.1
108+
* @see #setSubscriptionRegistry
109+
* @see DefaultSubscriptionRegistry#setPathMatcher
110+
* @see org.springframework.util.AntPathMatcher
110111
*/
111112
public void setPathMatcher(PathMatcher pathMatcher) {
112113
this.pathMatcher = pathMatcher;
113114
initPathMatcherToUse();
114115
}
115116

117+
private void initPathMatcherToUse() {
118+
if (this.pathMatcher != null && this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
119+
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setPathMatcher(this.pathMatcher);
120+
}
121+
}
122+
123+
/**
124+
* When configured, the specified cache limit is passed down to the
125+
* underlying SubscriptionRegistry, overriding any default there.
126+
* <p>With a standard {@link DefaultSubscriptionRegistry}, the default
127+
* cache limit is 1024.
128+
* @since 4.3.2
129+
* @see #setSubscriptionRegistry
130+
* @see DefaultSubscriptionRegistry#setCacheLimit
131+
* @see DefaultSubscriptionRegistry#DEFAULT_CACHE_LIMIT
132+
*/
133+
public void setCacheLimit(Integer cacheLimit) {
134+
this.cacheLimit = cacheLimit;
135+
initCacheLimitToUse();
136+
}
137+
138+
private void initCacheLimitToUse() {
139+
if (this.cacheLimit != null && this.subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
140+
((DefaultSubscriptionRegistry) this.subscriptionRegistry).setCacheLimit(this.cacheLimit);
141+
}
142+
}
143+
116144
/**
117145
* Configure the {@link org.springframework.scheduling.TaskScheduler} to
118146
* use for providing heartbeat support. Setting this property also sets the
@@ -130,6 +158,7 @@ public void setTaskScheduler(TaskScheduler taskScheduler) {
130158

131159
/**
132160
* Return the configured TaskScheduler.
161+
* @since 4.2
133162
*/
134163
public TaskScheduler getTaskScheduler() {
135164
return this.taskScheduler;
@@ -151,6 +180,7 @@ public void setHeartbeatValue(long[] heartbeat) {
151180

152181
/**
153182
* The configured value for the heart-beat settings.
183+
* @since 4.2
154184
*/
155185
public long[] getHeartbeatValue() {
156186
return this.heartbeatValue;
@@ -160,13 +190,15 @@ public long[] getHeartbeatValue() {
160190
* Configure a {@link MessageHeaderInitializer} to apply to the headers
161191
* of all messages sent to the client outbound channel.
162192
* <p>By default this property is not set.
193+
* @since 4.1
163194
*/
164195
public void setHeaderInitializer(MessageHeaderInitializer headerInitializer) {
165196
this.headerInitializer = headerInitializer;
166197
}
167198

168199
/**
169200
* Return the configured header initializer.
201+
* @since 4.1
170202
*/
171203
public MessageHeaderInitializer getHeaderInitializer() {
172204
return this.headerInitializer;

spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ protected final ChannelRegistration getClientInboundChannelRegistration() {
143143
}
144144

145145
/**
146-
* A hook for sub-classes to customize the message channel for inbound messages
146+
* A hook for subclasses to customize the message channel for inbound messages
147147
* from WebSocket clients.
148148
*/
149149
protected void configureClientInboundChannel(ChannelRegistration registration) {
@@ -176,7 +176,7 @@ protected final ChannelRegistration getClientOutboundChannelRegistration() {
176176
}
177177

178178
/**
179-
* A hook for sub-classes to customize the message channel for messages from
179+
* A hook for subclasses to customize the message channel for messages from
180180
* the application or message broker to WebSocket clients.
181181
*/
182182
protected void configureClientOutboundChannel(ChannelRegistration registration) {
@@ -224,7 +224,7 @@ protected final MessageBrokerRegistry getBrokerRegistry() {
224224
}
225225

226226
/**
227-
* A hook for sub-classes to customize message broker configuration through the
227+
* A hook for subclasses to customize message broker configuration through the
228228
* provided {@link MessageBrokerRegistry} instance.
229229
*/
230230
protected void configureMessageBroker(MessageBrokerRegistry registry) {
@@ -253,15 +253,15 @@ public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() {
253253
addReturnValueHandlers(returnValueHandlers);
254254
handler.setCustomReturnValueHandlers(returnValueHandlers);
255255

256-
PathMatcher pathMatcher = this.getBrokerRegistry().getPathMatcher();
256+
PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher();
257257
if (pathMatcher != null) {
258258
handler.setPathMatcher(pathMatcher);
259259
}
260260
return handler;
261261
}
262262

263263
/**
264-
* Protected method for plugging in a custom sub-class of
264+
* Protected method for plugging in a custom subclass of
265265
* {@link org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler
266266
* SimpAnnotationMethodMessageHandler}.
267267
* @since 4.2
@@ -324,7 +324,6 @@ public MessageHandler userRegistryMessageHandler() {
324324
}
325325

326326
// Expose alias for 4.1 compatibility
327-
328327
@Bean(name={"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"})
329328
public ThreadPoolTaskScheduler messageBrokerTaskScheduler() {
330329
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
@@ -461,6 +460,7 @@ public void handleMessage(Message<?> message) {
461460

462461
}
463462

463+
464464
private class NoOpBrokerMessageHandler extends AbstractBrokerMessageHandler {
465465

466466
public NoOpBrokerMessageHandler() {

spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,8 @@ public class MessageBrokerRegistry {
5151

5252
private PathMatcher pathMatcher;
5353

54+
private Integer cacheLimit;
55+
5456

5557
public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) {
5658
Assert.notNull(clientInboundChannel);
@@ -96,6 +98,16 @@ protected ChannelRegistration getBrokerChannelRegistration() {
9698
return this.brokerChannelRegistration;
9799
}
98100

101+
protected String getUserDestinationBroadcast() {
102+
return (this.brokerRelayRegistration != null ?
103+
this.brokerRelayRegistration.getUserDestinationBroadcast() : null);
104+
}
105+
106+
protected String getUserRegistryBroadcast() {
107+
return (this.brokerRelayRegistration != null ?
108+
this.brokerRelayRegistration.getUserRegistryBroadcast() : null);
109+
}
110+
99111
/**
100112
* Configure one or more prefixes to filter destinations targeting application
101113
* annotated methods. For example destinations prefixed with "/app" may be
@@ -137,16 +149,6 @@ protected String getUserDestinationPrefix() {
137149
return this.userDestinationPrefix;
138150
}
139151

140-
protected String getUserDestinationBroadcast() {
141-
return (this.brokerRelayRegistration != null ?
142-
this.brokerRelayRegistration.getUserDestinationBroadcast() : null);
143-
}
144-
145-
protected String getUserRegistryBroadcast() {
146-
return (this.brokerRelayRegistration != null ?
147-
this.brokerRelayRegistration.getUserRegistryBroadcast() : null);
148-
}
149-
150152
/**
151153
* Configure the PathMatcher to use to match the destinations of incoming
152154
* messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.
@@ -162,6 +164,7 @@ protected String getUserRegistryBroadcast() {
162164
* <p>When the simple broker is enabled, the PathMatcher configured here is
163165
* also used to match message destinations when brokering messages.
164166
* @since 4.1
167+
* @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setPathMatcher
165168
*/
166169
public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher) {
167170
this.pathMatcher = pathMatcher;
@@ -172,6 +175,18 @@ protected PathMatcher getPathMatcher() {
172175
return this.pathMatcher;
173176
}
174177

178+
/**
179+
* Configure the cache limit to apply for registrations with the broker.
180+
* <p>This is currently only applied for the destination cache in the
181+
* subscription registry. The default cache limit there is 1024.
182+
* @since 4.3.2
183+
* @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setCacheLimit
184+
*/
185+
public MessageBrokerRegistry setCacheLimit(int cacheLimit) {
186+
this.cacheLimit = cacheLimit;
187+
return this;
188+
}
189+
175190

176191
protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {
177192
if (this.simpleBrokerRegistration == null && this.brokerRelayRegistration == null) {
@@ -180,6 +195,7 @@ protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerC
180195
if (this.simpleBrokerRegistration != null) {
181196
SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel);
182197
handler.setPathMatcher(this.pathMatcher);
198+
handler.setCacheLimit(this.cacheLimit);
183199
return handler;
184200
}
185201
return null;

0 commit comments

Comments
 (0)