Skip to content

Commit 79de45b

Browse files
committed
Add cache limit to DefaultSessionRegistry and polish
Issue: SPR-11657
1 parent 63a868d commit 79de45b

File tree

2 files changed

+147
-122
lines changed

2 files changed

+147
-122
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 100 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616

1717
package org.springframework.messaging.simp.broker;
1818

19-
import java.util.*;
19+
import java.util.Collection;
20+
import java.util.HashSet;
21+
import java.util.LinkedHashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Set;
2025
import java.util.concurrent.ConcurrentHashMap;
2126
import java.util.concurrent.ConcurrentMap;
2227

@@ -25,6 +30,7 @@
2530
import org.springframework.util.Assert;
2631
import org.springframework.util.LinkedMultiValueMap;
2732
import org.springframework.util.MultiValueMap;
33+
import org.springframework.util.PathMatcher;
2834

2935
/**
3036
* A default, simple in-memory implementation of {@link SubscriptionRegistry}.
@@ -35,28 +41,54 @@
3541
*/
3642
public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
3743

44+
/** Default maximum number of entries for the destination cache: 1024 */
45+
public static final int DEFAULT_CACHE_LIMIT = 1024;
46+
47+
48+
/** The maximum number of entries in the cache */
49+
private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
50+
3851
private final DestinationCache destinationCache = new DestinationCache();
3952

4053
private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry();
4154

42-
private AntPathMatcher pathMatcher = new AntPathMatcher();
55+
private PathMatcher pathMatcher = new AntPathMatcher();
56+
4357

4458

4559
/**
46-
* @param pathMatcher the pathMatcher to set
60+
* Specify the maximum number of entries for the resolved destination cache.
61+
* Default is 1024.
4762
*/
48-
public void setPathMatcher(AntPathMatcher pathMatcher) {
63+
public void setCacheLimit(int cacheLimit) {
64+
this.cacheLimit = cacheLimit;
65+
}
66+
67+
/**
68+
* Return the maximum number of entries for the resolved destination cache.
69+
*/
70+
public int getCacheLimit() {
71+
return this.cacheLimit;
72+
}
73+
74+
/**
75+
* The PathMatcher to use.
76+
*/
77+
public void setPathMatcher(PathMatcher pathMatcher) {
4978
this.pathMatcher = pathMatcher;
5079
}
5180

52-
public AntPathMatcher getPathMatcher() {
81+
/**
82+
* The configured PathMatcher.
83+
*/
84+
public PathMatcher getPathMatcher() {
5385
return this.pathMatcher;
5486
}
5587

5688
@Override
5789
protected void addSubscriptionInternal(String sessionId, String subsId, String destination, Message<?> message) {
58-
SessionSubscriptionInfo info = this.subscriptionRegistry.addSubscription(sessionId, subsId, destination);
59-
this.destinationCache.mapToDestination(destination, sessionId, subsId);
90+
this.subscriptionRegistry.addSubscription(sessionId, subsId, destination);
91+
this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
6092
}
6193

6294
@Override
@@ -65,7 +97,7 @@ protected void removeSubscriptionInternal(String sessionId, String subsId, Messa
6597
if (info != null) {
6698
String destination = info.removeSubscription(subsId);
6799
if (info.getSubscriptions(destination) == null) {
68-
this.destinationCache.unmapFromDestination(destination, sessionId, subsId);
100+
this.destinationCache.updateAfterRemovedSubscription(destination, sessionId, subsId);
69101
}
70102
}
71103
}
@@ -77,30 +109,28 @@ public void unregisterAllSubscriptions(String sessionId) {
77109
if (logger.isDebugEnabled()) {
78110
logger.debug("Unregistering subscriptions for sessionId=" + sessionId);
79111
}
80-
this.destinationCache.removeSessionSubscriptions(info);
112+
this.destinationCache.updateAfterRemovedSession(info);
81113
}
82114
}
83115

84116
@Override
85117
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
86-
MultiValueMap<String,String> result;
87-
if (this.destinationCache.isCachedDestination(destination)) {
88-
result = this.destinationCache.getSubscriptions(destination);
118+
MultiValueMap<String,String> result = this.destinationCache.getSubscriptions(destination);
119+
if (result != null) {
120+
return result;
89121
}
90-
else {
91-
result = new LinkedMultiValueMap<String, String>();
92-
for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) {
93-
for (String destinationPattern : info.getDestinations()) {
94-
if (this.pathMatcher.match(destinationPattern, destination)) {
95-
for (String subscriptionId : info.getSubscriptions(destinationPattern)) {
96-
result.add(info.sessionId, subscriptionId);
97-
}
122+
result = new LinkedMultiValueMap<String, String>();
123+
for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) {
124+
for (String destinationPattern : info.getDestinations()) {
125+
if (this.pathMatcher.match(destinationPattern, destination)) {
126+
for (String subscriptionId : info.getSubscriptions(destinationPattern)) {
127+
result.add(info.sessionId, subscriptionId);
98128
}
99129
}
100130
}
101-
if(!result.isEmpty()) {
102-
this.destinationCache.addSubscriptions(destination, result);
103-
}
131+
}
132+
if(!result.isEmpty()) {
133+
this.destinationCache.addSubscriptions(destination, result);
104134
}
105135
return result;
106136
}
@@ -112,85 +142,84 @@ public String toString() {
112142
}
113143

114144

115-
116-
117145
/**
118-
* Provide direct lookup of session subscriptions by destination
146+
* A cache for destinations previously resolved via
147+
* {@link DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)}
119148
*/
120-
private static class DestinationCache {
121-
122-
private AntPathMatcher pathMatcher = new AntPathMatcher();
149+
private class DestinationCache {
150+
151+
/** Map from destination -> <sessionId, subscriptionId> */
152+
@SuppressWarnings("serial")
153+
private final Map<String, MultiValueMap<String, String>> cache =
154+
new LinkedHashMap<String, MultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
155+
@Override
156+
protected boolean removeEldestEntry(Map.Entry<String, MultiValueMap<String, String>> eldest) {
157+
return size() > getCacheLimit();
158+
}
159+
};
123160

124-
// destination -> ..
125-
private final Map<String, MultiValueMap<String, String>> subscriptionsByDestination =
126-
new ConcurrentHashMap<String, MultiValueMap<String, String>>();
127161

128-
private final Object monitor = new Object();
129162

163+
public MultiValueMap<String, String> getSubscriptions(String destination) {
164+
synchronized (this.cache) {
165+
return this.cache.get(destination);
166+
}
167+
}
130168

131169
public void addSubscriptions(String destination, MultiValueMap<String, String> subscriptions) {
132-
this.subscriptionsByDestination.put(destination, subscriptions);
170+
synchronized (this.cache) {
171+
this.cache.put(destination, subscriptions);
172+
}
133173
}
134174

135-
public void mapToDestination(String destination, String sessionId, String subsId) {
136-
synchronized(this.monitor) {
137-
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
138-
if (this.pathMatcher.match(destination, cachedDestination)) {
139-
MultiValueMap<String, String> registrations = this.subscriptionsByDestination.get(cachedDestination);
140-
if (registrations == null) {
141-
registrations = new LinkedMultiValueMap<String, String>();
142-
}
143-
registrations.add(sessionId, subsId);
175+
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
176+
synchronized(this.cache) {
177+
for (String cachedDestination : this.cache.keySet()) {
178+
if (getPathMatcher().match(destination, cachedDestination)) {
179+
MultiValueMap<String, String> subscriptions = this.cache.get(cachedDestination);
180+
subscriptions.add(sessionId, subsId);
144181
}
145182
}
146183
}
147184
}
148185

149-
public void unmapFromDestination(String destination, String sessionId, String subsId) {
150-
synchronized(this.monitor) {
151-
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
152-
if (this.pathMatcher.match(destination, cachedDestination)) {
153-
MultiValueMap<String, String> registrations = this.subscriptionsByDestination.get(cachedDestination);
154-
List<String> subscriptions = registrations.get(sessionId);
155-
while(subscriptions.remove(subsId));
156-
if (subscriptions.isEmpty()) {
157-
registrations.remove(sessionId);
186+
public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) {
187+
synchronized(this.cache) {
188+
for (String cachedDestination : this.cache.keySet()) {
189+
if (getPathMatcher().match(destination, cachedDestination)) {
190+
MultiValueMap<String, String> subscriptions = this.cache.get(cachedDestination);
191+
List<String> subsIds = subscriptions.get(sessionId);
192+
subsIds.remove(subsId);
193+
if (subsIds.isEmpty()) {
194+
subscriptions.remove(sessionId);
158195
}
159-
if (registrations.isEmpty()) {
160-
this.subscriptionsByDestination.remove(cachedDestination);
196+
if (subscriptions.isEmpty()) {
197+
this.cache.remove(cachedDestination);
161198
}
162199
}
163200
}
164201
}
165202
}
166203

167-
public void removeSessionSubscriptions(SessionSubscriptionInfo info) {
168-
synchronized(this.monitor) {
204+
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
205+
synchronized(this.cache) {
169206
for (String destination : info.getDestinations()) {
170-
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
171-
if (this.pathMatcher.match(destination, cachedDestination)) {
172-
MultiValueMap<String, String> map = this.subscriptionsByDestination.get(cachedDestination);
207+
for (String cachedDestination : this.cache.keySet()) {
208+
if (getPathMatcher().match(destination, cachedDestination)) {
209+
MultiValueMap<String, String> map = this.cache.get(cachedDestination);
173210
map.remove(info.getSessionId());
174211
if (map.isEmpty()) {
175-
this.subscriptionsByDestination.remove(cachedDestination);
212+
this.cache.remove(cachedDestination);
176213
}
177214
}
178215
}
179216
}
180217
}
181218
}
182219

183-
public MultiValueMap<String, String> getSubscriptions(String destination) {
184-
return this.subscriptionsByDestination.get(destination);
185-
}
186-
187-
public boolean isCachedDestination(String destination) {
188-
return subscriptionsByDestination.containsKey(destination);
189-
}
190-
191220
@Override
192221
public String toString() {
193-
return "[subscriptionsByDestination=" + this.subscriptionsByDestination + "]";
222+
return "[cache=" + this.cache + "]";
194223
}
195224
}
196225

@@ -199,6 +228,7 @@ public String toString() {
199228
*/
200229
private static class SessionSubscriptionRegistry {
201230

231+
// sessionId -> SessionSubscriptionInfo
202232
private final ConcurrentMap<String, SessionSubscriptionInfo> sessions =
203233
new ConcurrentHashMap<String, SessionSubscriptionInfo>();
204234

@@ -241,6 +271,7 @@ private static class SessionSubscriptionInfo {
241271

242272
private final String sessionId;
243273

274+
// destination -> subscriptionIds
244275
private final Map<String, Set<String>> subscriptions = new ConcurrentHashMap<String, Set<String>>(4);
245276

246277
private final Object monitor = new Object();

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

Lines changed: 47 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -134,59 +134,53 @@ public void registerSubscriptionWithDestinationPattern() {
134134
// SPR-11657
135135

136136
@Test
137-
public void registerMultipleSubscriptionsWithOneUsingDestinationPattern() {
138-
139-
String sessId1 = "sess01";
140-
String sessId2 = "sess02";
141-
142-
String destPatternIbm = "/topic/PRICE.STOCK.*.IBM";
143-
String destNasdaqIbm = "/topic/PRICE.STOCK.NASDAQ.IBM";
144-
String destNyseIdm = "/topic/PRICE.STOCK.NYSE.IBM";
145-
String destNasdaqGoogle = "/topic/PRICE.STOCK.NASDAQ.GOOG";
146-
147-
String sessId1ToDestPatternIbm = "subs01";
148-
String sessId1ToDestNasdaqIbm = "subs02";
149-
String sessId2TodestNasdaqIbm = "subs03";
150-
String sessId2ToDestNyseIdm = "subs04";
151-
String sessId2ToDestNasdaqGoogle = "subs05";
152-
153-
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestNasdaqIbm, destNasdaqIbm));
154-
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestPatternIbm, destPatternIbm));
155-
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(destNasdaqIbm));
156-
assertEquals("Expected 1 elements " + actual, 1, actual.size());
157-
assertEquals(Arrays.asList(sessId1ToDestNasdaqIbm, sessId1ToDestPatternIbm), actual.get(sessId1));
158-
159-
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2TodestNasdaqIbm, destNasdaqIbm));
160-
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2ToDestNyseIdm, destNyseIdm));
161-
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2ToDestNasdaqGoogle, destNasdaqGoogle));
162-
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
163-
assertEquals("Expected 2 elements " + actual, 2, actual.size());
164-
assertEquals(Arrays.asList(sessId1ToDestNasdaqIbm, sessId1ToDestPatternIbm), actual.get(sessId1));
165-
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
166-
167-
this.registry.unregisterAllSubscriptions(sessId1);
168-
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
169-
assertEquals("Expected 1 elements " + actual, 1, actual.size());
170-
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
171-
172-
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestPatternIbm, destPatternIbm));
173-
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestNasdaqIbm, destNasdaqIbm));
174-
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
175-
assertEquals("Expected 2 elements " + actual, 2, actual.size());
176-
assertEquals(Arrays.asList(sessId1ToDestPatternIbm, sessId1ToDestNasdaqIbm), actual.get(sessId1));
177-
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
178-
179-
this.registry.unregisterSubscription(unsubscribeMessage(sessId1, sessId1ToDestNasdaqIbm));
180-
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
181-
assertEquals("Expected 2 elements " + actual, 2, actual.size());
182-
assertEquals(Arrays.asList(sessId1ToDestPatternIbm), actual.get(sessId1));
183-
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
184-
this.registry.unregisterSubscription(unsubscribeMessage(sessId1, sessId1ToDestPatternIbm));
185-
assertEquals("Expected 1 elements " + actual, 1, actual.size());
186-
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
187-
188-
this.registry.unregisterSubscription(unsubscribeMessage(sessId2, sessId2TodestNasdaqIbm));
189-
assertEquals("Expected 0 element " + actual, 0, actual.size());
137+
public void registerSubscriptionsWithSimpleAndPatternDestinations() {
138+
139+
String sess1 = "sess01";
140+
String sess2 = "sess02";
141+
142+
String subs1 = "subs01";
143+
String subs2 = "subs02";
144+
String subs3 = "subs03";
145+
146+
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM"));
147+
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM"));
148+
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
149+
assertEquals(1, actual.size());
150+
assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1));
151+
152+
this.registry.registerSubscription(subscribeMessage(sess2, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM"));
153+
this.registry.registerSubscription(subscribeMessage(sess2, subs2, "/topic/PRICE.STOCK.NYSE.IBM"));
154+
this.registry.registerSubscription(subscribeMessage(sess2, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
155+
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
156+
assertEquals(2, actual.size());
157+
assertEquals(Arrays.asList(subs2, subs1), actual.get(sess1));
158+
assertEquals(Arrays.asList(subs1), actual.get(sess2));
159+
160+
this.registry.unregisterAllSubscriptions(sess1);
161+
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
162+
assertEquals(1, actual.size());
163+
assertEquals(Arrays.asList(subs1), actual.get(sess2));
164+
165+
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.*.IBM"));
166+
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NASDAQ.IBM"));
167+
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
168+
assertEquals(2, actual.size());
169+
assertEquals(Arrays.asList(subs1, subs2), actual.get(sess1));
170+
assertEquals(Arrays.asList(subs1), actual.get(sess2));
171+
172+
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs2));
173+
actual = this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
174+
assertEquals(2, actual.size());
175+
assertEquals(Arrays.asList(subs1), actual.get(sess1));
176+
assertEquals(Arrays.asList(subs1), actual.get(sess2));
177+
178+
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1));
179+
assertEquals(1, actual.size());
180+
assertEquals(Arrays.asList(subs1), actual.get(sess2));
181+
182+
this.registry.unregisterSubscription(unsubscribeMessage(sess2, subs1));
183+
assertEquals(0, actual.size());
190184
}
191185

192186
@Test

0 commit comments

Comments
 (0)