Skip to content

Commit 0086dd8

Browse files
committed
Enforce cacheLimit in DefaultSubscriptionRegistry
When the cacheLimit is reached and there is an eviction from the updateCache, the accessCache is now also updated. This change also ensures that adding a destination to the cache is protected with synchronization on the updateCache. Issue: SPR-13555
1 parent 799a03e commit 0086dd8

File tree

3 files changed

+62
-39
lines changed

3 files changed

+62
-39
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/AbstractSubscriptionRegistry.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.springframework.messaging.MessageHeaders;
2424
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
2525
import org.springframework.messaging.simp.SimpMessageType;
26+
import org.springframework.util.CollectionUtils;
27+
import org.springframework.util.LinkedMultiValueMap;
2628
import org.springframework.util.MultiValueMap;
2729

2830
/**
@@ -35,6 +37,10 @@
3537
*/
3638
public abstract class AbstractSubscriptionRegistry implements SubscriptionRegistry {
3739

40+
private static MultiValueMap<String, String> EMPTY_MAP =
41+
CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<String, String>(0));
42+
43+
3844
protected final Log logger = LogFactory.getLog(getClass());
3945

4046

@@ -104,7 +110,7 @@ public final MultiValueMap<String, String> findSubscriptions(Message<?> message)
104110
String destination = SimpMessageHeaderAccessor.getDestination(headers);
105111
if (destination == null) {
106112
logger.error("No destination in " + message);
107-
return null;
113+
return EMPTY_MAP;
108114
}
109115

110116
return findSubscriptionsInternal(destination, message);

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -115,24 +115,7 @@ public void unregisterAllSubscriptions(String sessionId) {
115115

116116
@Override
117117
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
118-
MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination);
119-
if (result != null) {
120-
return result;
121-
}
122-
result = new LinkedMultiValueMap<String, String>();
123-
for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) {
124-
for (String destinationPattern : info.getDestinations()) {
125-
if (this.pathMatcher.match(destinationPattern, destination)) {
126-
for (String subscriptionId : info.getSubscriptions(destinationPattern)) {
127-
result.add(info.sessionId, subscriptionId);
128-
}
129-
}
130-
}
131-
}
132-
if (!result.isEmpty()) {
133-
this.destinationCache.addSubscriptions(destination, result);
134-
}
135-
return result;
118+
return this.destinationCache.getSubscriptions(destination, message);
136119
}
137120

138121
@Override
@@ -157,20 +140,38 @@ private class DestinationCache {
157140
new LinkedHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
158141
@Override
159142
protected boolean removeEldestEntry(Map.Entry<String, MultiValueMap<String, String>> eldest) {
160-
return size() > getCacheLimit();
143+
if (size() > getCacheLimit()) {
144+
accessCache.remove(eldest.getKey());
145+
return true;
146+
}
147+
else {
148+
return false;
149+
}
161150
}
162151
};
163152

164153

165-
public MultiValueMap<String, String> getSubscriptions(String destination) {
166-
return this.accessCache.get(destination);
167-
}
168-
169-
public void addSubscriptions(String destination, MultiValueMap<String, String> subscriptions) {
170-
synchronized (this.updateCache) {
171-
this.updateCache.put(destination, deepCopy(subscriptions));
172-
this.accessCache.put(destination, subscriptions);
154+
public MultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
155+
MultiValueMap<String, String> result = this.accessCache.get(destination);
156+
if (result == null) {
157+
synchronized (this.updateCache) {
158+
result = new LinkedMultiValueMap<String, String>();
159+
for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) {
160+
for (String destinationPattern : info.getDestinations()) {
161+
if (getPathMatcher().match(destinationPattern, destination)) {
162+
for (String subscriptionId : info.getSubscriptions(destinationPattern)) {
163+
result.add(info.sessionId, subscriptionId);
164+
}
165+
}
166+
}
167+
}
168+
if (!result.isEmpty()) {
169+
this.updateCache.put(destination, deepCopy(result));
170+
this.accessCache.put(destination, result);
171+
}
172+
}
173173
}
174+
return result;
174175
}
175176

176177
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void registerSubscription() {
6969

7070
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(dest));
7171
assertEquals("Expected one element " + actual, 1, actual.size());
72-
assertEquals(Arrays.asList(subsId), actual.get(sessId));
72+
assertEquals(Collections.singletonList(subsId), actual.get(sessId));
7373
}
7474

7575
@Test
@@ -116,7 +116,7 @@ public void registerSubscriptionWithDestinationPattern() {
116116

117117
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(dest));
118118
assertEquals("Expected one element " + actual, 1, actual.size());
119-
assertEquals(Arrays.asList(subsId), actual.get(sessId));
119+
assertEquals(Collections.singletonList(subsId), actual.get(sessId));
120120
}
121121

122122
@Test // SPR-11657
@@ -143,34 +143,34 @@ public void registerSubscriptionsWithSimpleAndPatternDestinations() {
143143
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
144144
assertEquals(2, actual.size());
145145
assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1));
146-
assertEquals(Arrays.asList(subs1), actual.get(sess2));
146+
assertEquals(Collections.singletonList(subs1), actual.get(sess2));
147147

148148
this.registry.unregisterAllSubscriptions(sess1);
149149

150150
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
151151
assertEquals(1, actual.size());
152-
assertEquals(Arrays.asList(subs1), actual.get(sess2));
152+
assertEquals(Collections.singletonList(subs1), actual.get(sess2));
153153

154154
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM"));
155155
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM"));
156156

157157
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
158158
assertEquals(2, actual.size());
159159
assertEquals(Arrays.asList(subs1, subs2), actual.get(sess1));
160-
assertEquals(Arrays.asList(subs1), actual.get(sess2));
160+
assertEquals(Collections.singletonList(subs1), actual.get(sess2));
161161

162162
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs2));
163163

164164
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
165165
assertEquals(2, actual.size());
166-
assertEquals(Arrays.asList(subs1), actual.get(sess1));
167-
assertEquals(Arrays.asList(subs1), actual.get(sess2));
166+
assertEquals(Collections.singletonList(subs1), actual.get(sess1));
167+
assertEquals(Collections.singletonList(subs1), actual.get(sess2));
168168

169169
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1));
170170

171171
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
172172
assertEquals(1, actual.size());
173-
assertEquals(Arrays.asList(subs1), actual.get(sess2));
173+
assertEquals(Collections.singletonList(subs1), actual.get(sess2));
174174

175175
this.registry.unregisterSubscription(unsubscribeMessage(sess2, subs1));
176176

@@ -222,13 +222,13 @@ public void registerSubscriptionWithDestinationPatternRegex() {
222222
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message);
223223

224224
assertEquals("Expected one element " + actual, 1, actual.size());
225-
assertEquals(Arrays.asList(subsId), actual.get(sessId));
225+
assertEquals(Collections.singletonList(subsId), actual.get(sessId));
226226

227227
message = message("/topic/PRICE.STOCK.NASDAQ.MSFT");
228228
actual = this.registry.findSubscriptions(message);
229229

230230
assertEquals("Expected one element " + actual, 1, actual.size());
231-
assertEquals(Arrays.asList(subsId), actual.get(sessId));
231+
assertEquals(Collections.singletonList(subsId), actual.get(sessId));
232232

233233
message = message("/topic/PRICE.STOCK.NASDAQ.VMW");
234234
actual = this.registry.findSubscriptions(message);
@@ -249,7 +249,7 @@ public void registerTwiceAndUnregisterSubscriptions() {
249249

250250
actual = this.registry.findSubscriptions(message("/foo"));
251251
assertEquals("Expected 1 element", 1, actual.size());
252-
assertEquals(Arrays.asList("subs02"), actual.get("sess01"));
252+
assertEquals(Collections.singletonList("subs02"), actual.get("sess01"));
253253

254254
this.registry.unregisterSubscription(unsubscribeMessage("sess01", "subs02"));
255255

@@ -345,6 +345,22 @@ public void findSubscriptionsReturnsMapSafeToIterateIncludingValues() throws Exc
345345
// no ConcurrentModificationException
346346
}
347347

348+
@Test // SPR-13555
349+
public void cacheLimitExceeded() throws Exception {
350+
this.registry.setCacheLimit(1);
351+
this.registry.registerSubscription(subscribeMessage("sess1", "1", "/foo"));
352+
this.registry.registerSubscription(subscribeMessage("sess1", "2", "/bar"));
353+
354+
assertEquals(1, this.registry.findSubscriptions(message("/foo")).size());
355+
assertEquals(1, this.registry.findSubscriptions(message("/bar")).size());
356+
357+
this.registry.registerSubscription(subscribeMessage("sess2", "1", "/foo"));
358+
this.registry.registerSubscription(subscribeMessage("sess2", "2", "/bar"));
359+
360+
assertEquals(2, this.registry.findSubscriptions(message("/foo")).size());
361+
assertEquals(2, this.registry.findSubscriptions(message("/bar")).size());
362+
}
363+
348364

349365
private Message<?> subscribeMessage(String sessionId, String subscriptionId, String destination) {
350366
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.SUBSCRIBE);

0 commit comments

Comments
 (0)