Skip to content

Commit 7ff915a

Browse files
committed
Enforce cacheLimit in DefaultSubscriptionRegistry
When the cacheLimit is reached and there is an eviction from the updateCache, the accessCache is now also updated. This change also ensures that adding a destination to the cache is protected with synchronization on the updateCache. Issue: SPR-13555
1 parent 0f70ac7 commit 7ff915a

File tree

2 files changed

+43
-93
lines changed

2 files changed

+43
-93
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -177,31 +177,13 @@ public void unregisterAllSubscriptions(String sessionId) {
177177
}
178178

179179
@Override
180-
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination,
181-
Message<?> message) {
182-
183-
LinkedMultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination);
184-
if (result != null) {
185-
return filterSubscriptions(result, message);
186-
}
187-
result = new LinkedMultiValueMap<String, String>();
188-
for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) {
189-
for (String destinationPattern : info.getDestinations()) {
190-
if (this.pathMatcher.match(destinationPattern, destination)) {
191-
for (Subscription subscription : info.getSubscriptions(destinationPattern)) {
192-
result.add(info.sessionId, subscription.getId());
193-
}
194-
}
195-
}
196-
}
197-
if (!result.isEmpty()) {
198-
this.destinationCache.addSubscriptions(destination, result);
199-
}
180+
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
181+
MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination, message);
200182
return filterSubscriptions(result, message);
201183
}
202184

203-
private MultiValueMap<String, String> filterSubscriptions(MultiValueMap<String, String> allMatches,
204-
Message<?> message) {
185+
private MultiValueMap<String, String> filterSubscriptions(
186+
MultiValueMap<String, String> allMatches, Message<?> message) {
205187

206188
EvaluationContext context = null;
207189
MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size());
@@ -264,20 +246,38 @@ private class DestinationCache {
264246
new LinkedHashMap<String, LinkedMultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
265247
@Override
266248
protected boolean removeEldestEntry(Map.Entry<String, LinkedMultiValueMap<String, String>> eldest) {
267-
return size() > getCacheLimit();
249+
if (size() > getCacheLimit()) {
250+
accessCache.remove(eldest.getKey());
251+
return true;
252+
}
253+
else {
254+
return false;
255+
}
268256
}
269257
};
270258

271259

272-
public LinkedMultiValueMap<String, String> getSubscriptions(String destination) {
273-
return this.accessCache.get(destination);
274-
}
275-
276-
public void addSubscriptions(String destination, LinkedMultiValueMap<String, String> subscriptions) {
277-
synchronized (this.updateCache) {
278-
this.updateCache.put(destination, subscriptions.deepCopy());
279-
this.accessCache.put(destination, subscriptions);
260+
public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
261+
LinkedMultiValueMap<String, String> result = this.accessCache.get(destination);
262+
if (result == null) {
263+
synchronized (this.updateCache) {
264+
result = new LinkedMultiValueMap<String, String>();
265+
for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) {
266+
for (String destinationPattern : info.getDestinations()) {
267+
if (getPathMatcher().match(destinationPattern, destination)) {
268+
for (Subscription subscription : info.getSubscriptions(destinationPattern)) {
269+
result.add(info.sessionId, subscription.getId());
270+
}
271+
}
272+
}
273+
}
274+
if (!result.isEmpty()) {
275+
this.updateCache.put(destination, result.deepCopy());
276+
this.accessCache.put(destination, result);
277+
}
278+
}
280279
}
280+
return result;
281281
}
282282

283283
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

Lines changed: 12 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,17 @@
2121
import java.util.Iterator;
2222
import java.util.List;
2323
import java.util.Map;
24-
import java.util.concurrent.CountDownLatch;
25-
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.atomic.AtomicReference;
2724

2825
import org.junit.Test;
2926

3027
import org.springframework.messaging.Message;
3128
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
3229
import org.springframework.messaging.simp.SimpMessageType;
3330
import org.springframework.messaging.support.MessageBuilder;
34-
import org.springframework.util.AntPathMatcher;
3531
import org.springframework.util.MultiValueMap;
36-
import org.springframework.util.PathMatcher;
3732

38-
import static org.junit.Assert.*;
33+
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertNotNull;
3935

4036
/**
4137
* Test fixture for
@@ -402,37 +398,22 @@ public void findSubscriptionsReturnsMapSafeToIterateIncludingValues() throws Exc
402398
// no ConcurrentModificationException
403399
}
404400

405-
@Test // SPR-13204
406-
public void findSubscriptionsWithConcurrentUnregisterAllSubscriptions() throws Exception {
407-
final CountDownLatch iterationPausedLatch = new CountDownLatch(1);
408-
final CountDownLatch iterationResumeLatch = new CountDownLatch(1);
409-
final CountDownLatch iterationDoneLatch = new CountDownLatch(1);
410-
411-
PathMatcher pathMatcher = new PausingPathMatcher(iterationPausedLatch, iterationResumeLatch);
412-
this.registry.setPathMatcher(pathMatcher);
401+
@Test // SPR-13555
402+
public void cacheLimitExceeded() throws Exception {
403+
this.registry.setCacheLimit(1);
413404
this.registry.registerSubscription(subscribeMessage("sess1", "1", "/foo"));
414-
this.registry.registerSubscription(subscribeMessage("sess2", "1", "/foo"));
415-
416-
AtomicReference<MultiValueMap<String, String>> subscriptions = new AtomicReference<>();
417-
new Thread(() -> {
418-
subscriptions.set(registry.findSubscriptions(createMessage("/foo")));
419-
iterationDoneLatch.countDown();
420-
}).start();
405+
this.registry.registerSubscription(subscribeMessage("sess1", "2", "/bar"));
421406

422-
assertTrue(iterationPausedLatch.await(10, TimeUnit.SECONDS));
407+
assertEquals(1, this.registry.findSubscriptions(createMessage("/foo")).size());
408+
assertEquals(1, this.registry.findSubscriptions(createMessage("/bar")).size());
423409

424-
this.registry.unregisterAllSubscriptions("sess1");
425-
this.registry.unregisterAllSubscriptions("sess2");
426-
427-
iterationResumeLatch.countDown();
428-
assertTrue(iterationDoneLatch.await(10, TimeUnit.SECONDS));
410+
this.registry.registerSubscription(subscribeMessage("sess2", "1", "/foo"));
411+
this.registry.registerSubscription(subscribeMessage("sess2", "2", "/bar"));
429412

430-
MultiValueMap<String, String> result = subscriptions.get();
431-
assertNotNull(result);
432-
assertEquals(0, result.size());
413+
assertEquals(2, this.registry.findSubscriptions(createMessage("/foo")).size());
414+
assertEquals(2, this.registry.findSubscriptions(createMessage("/bar")).size());
433415
}
434416

435-
436417
private Message<?> createMessage(String destination) {
437418
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
438419
accessor.setDestination(destination);
@@ -468,35 +449,4 @@ private List<String> sort(List<String> list) {
468449
return list;
469450
}
470451

471-
472-
/**
473-
* An extension of AntPathMatcher with a pair of CountDownLatches to pause
474-
* while matching, allowing another thread to something, and resume when the
475-
* other thread signals it's okay to do so.
476-
*/
477-
private static class PausingPathMatcher extends AntPathMatcher {
478-
479-
private final CountDownLatch iterationPausedLatch;
480-
481-
private final CountDownLatch iterationResumeLatch;
482-
483-
public PausingPathMatcher(CountDownLatch iterationPausedLatch, CountDownLatch iterationResumeLatch) {
484-
this.iterationPausedLatch = iterationPausedLatch;
485-
this.iterationResumeLatch = iterationResumeLatch;
486-
}
487-
488-
@Override
489-
public boolean match(String pattern, String path) {
490-
try {
491-
this.iterationPausedLatch.countDown();
492-
assertTrue(this.iterationResumeLatch.await(10, TimeUnit.SECONDS));
493-
return super.match(pattern, path);
494-
}
495-
catch (InterruptedException ex) {
496-
ex.printStackTrace();
497-
return false;
498-
}
499-
}
500-
}
501-
502452
}

0 commit comments

Comments
 (0)