Skip to content

Commit 63a868d

Browse files
sdeleuzerstoyanchev
authored andcommitted
Fix DefaultSubscriptionRegistry when using pattern destinations
DestinationCache is now used for both plain and pattern destinations. It stores internally the subscriptions map for each cached destination. Subscriptions are initially created when there is no cache for the requested destination, and are updated when subscriptions change. Issue: SPR-11657
1 parent 1afdd9b commit 63a868d

File tree

2 files changed

+122
-46
lines changed

2 files changed

+122
-46
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,13 +16,9 @@
1616

1717
package org.springframework.messaging.simp.broker;
1818

19-
import java.util.Collection;
20-
import java.util.HashSet;
21-
import java.util.Map;
22-
import java.util.Set;
19+
import java.util.*;
2320
import java.util.concurrent.ConcurrentHashMap;
2421
import java.util.concurrent.ConcurrentMap;
25-
import java.util.concurrent.CopyOnWriteArraySet;
2622

2723
import org.springframework.messaging.Message;
2824
import org.springframework.util.AntPathMatcher;
@@ -34,6 +30,7 @@
3430
* A default, simple in-memory implementation of {@link SubscriptionRegistry}.
3531
*
3632
* @author Rossen Stoyanchev
33+
* @author Sebastien Deleuze
3734
* @since 4.0
3835
*/
3936
public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@@ -59,18 +56,16 @@ public AntPathMatcher getPathMatcher() {
5956
@Override
6057
protected void addSubscriptionInternal(String sessionId, String subsId, String destination, Message<?> message) {
6158
SessionSubscriptionInfo info = this.subscriptionRegistry.addSubscription(sessionId, subsId, destination);
62-
if (!this.pathMatcher.isPattern(destination)) {
63-
this.destinationCache.mapToDestination(destination, info);
64-
}
59+
this.destinationCache.mapToDestination(destination, sessionId, subsId);
6560
}
6661

6762
@Override
68-
protected void removeSubscriptionInternal(String sessionId, String subscriptionId, Message<?> message) {
63+
protected void removeSubscriptionInternal(String sessionId, String subsId, Message<?> message) {
6964
SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
7065
if (info != null) {
71-
String destination = info.removeSubscription(subscriptionId);
66+
String destination = info.removeSubscription(subsId);
7267
if (info.getSubscriptions(destination) == null) {
73-
this.destinationCache.unmapFromDestination(destination, info);
68+
this.destinationCache.unmapFromDestination(destination, sessionId, subsId);
7469
}
7570
}
7671
}
@@ -88,8 +83,11 @@ public void unregisterAllSubscriptions(String sessionId) {
8883

8984
@Override
9085
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
91-
MultiValueMap<String,String> result = this.destinationCache.getSubscriptions(destination);
92-
if (result.isEmpty()) {
86+
MultiValueMap<String,String> result;
87+
if (this.destinationCache.isCachedDestination(destination)) {
88+
result = this.destinationCache.getSubscriptions(destination);
89+
}
90+
else {
9391
result = new LinkedMultiValueMap<String, String>();
9492
for (SessionSubscriptionInfo info : this.subscriptionRegistry.getAllSubscriptions()) {
9593
for (String destinationPattern : info.getDestinations()) {
@@ -100,6 +98,9 @@ protected MultiValueMap<String, String> findSubscriptionsInternal(String destina
10098
}
10199
}
102100
}
101+
if(!result.isEmpty()) {
102+
this.destinationCache.addSubscriptions(destination, result);
103+
}
103104
}
104105
return result;
105106
}
@@ -114,60 +115,77 @@ public String toString() {
114115

115116

116117
/**
117-
* Provide direct lookup of session subscriptions by destination (for non-pattern destinations).
118+
* Provide direct lookup of session subscriptions by destination
118119
*/
119120
private static class DestinationCache {
120121

122+
private AntPathMatcher pathMatcher = new AntPathMatcher();
123+
121124
// destination -> ..
122-
private final Map<String, Set<SessionSubscriptionInfo>> subscriptionsByDestination =
123-
new ConcurrentHashMap<String, Set<SessionSubscriptionInfo>>();
125+
private final Map<String, MultiValueMap<String, String>> subscriptionsByDestination =
126+
new ConcurrentHashMap<String, MultiValueMap<String, String>>();
124127

125128
private final Object monitor = new Object();
126129

127130

128-
public void mapToDestination(String destination, SessionSubscriptionInfo info) {
131+
public void addSubscriptions(String destination, MultiValueMap<String, String> subscriptions) {
132+
this.subscriptionsByDestination.put(destination, subscriptions);
133+
}
134+
135+
public void mapToDestination(String destination, String sessionId, String subsId) {
129136
synchronized(this.monitor) {
130-
Set<SessionSubscriptionInfo> registrations = this.subscriptionsByDestination.get(destination);
131-
if (registrations == null) {
132-
registrations = new CopyOnWriteArraySet<SessionSubscriptionInfo>();
133-
this.subscriptionsByDestination.put(destination, registrations);
137+
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
138+
if (this.pathMatcher.match(destination, cachedDestination)) {
139+
MultiValueMap<String, String> registrations = this.subscriptionsByDestination.get(cachedDestination);
140+
if (registrations == null) {
141+
registrations = new LinkedMultiValueMap<String, String>();
142+
}
143+
registrations.add(sessionId, subsId);
144+
}
134145
}
135-
registrations.add(info);
136146
}
137147
}
138148

139-
public void unmapFromDestination(String destination, SessionSubscriptionInfo info) {
149+
public void unmapFromDestination(String destination, String sessionId, String subsId) {
140150
synchronized(this.monitor) {
141-
Set<SessionSubscriptionInfo> infos = this.subscriptionsByDestination.get(destination);
142-
if (infos != null) {
143-
infos.remove(info);
144-
if (infos.isEmpty()) {
145-
this.subscriptionsByDestination.remove(destination);
151+
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
152+
if (this.pathMatcher.match(destination, cachedDestination)) {
153+
MultiValueMap<String, String> registrations = this.subscriptionsByDestination.get(cachedDestination);
154+
List<String> subscriptions = registrations.get(sessionId);
155+
while(subscriptions.remove(subsId));
156+
if (subscriptions.isEmpty()) {
157+
registrations.remove(sessionId);
158+
}
159+
if (registrations.isEmpty()) {
160+
this.subscriptionsByDestination.remove(cachedDestination);
161+
}
146162
}
147163
}
148164
}
149165
}
150166

151167
public void removeSessionSubscriptions(SessionSubscriptionInfo info) {
152-
for (String destination : info.getDestinations()) {
153-
unmapFromDestination(destination, info);
154-
}
155-
}
156-
157-
public MultiValueMap<String, String> getSubscriptions(String destination) {
158-
MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>();
159-
Set<SessionSubscriptionInfo> infos = this.subscriptionsByDestination.get(destination);
160-
if (infos != null) {
161-
for (SessionSubscriptionInfo info : infos) {
162-
Set<String> subscriptions = info.getSubscriptions(destination);
163-
if (subscriptions != null) {
164-
for (String subscription : subscriptions) {
165-
result.add(info.getSessionId(), subscription);
168+
synchronized(this.monitor) {
169+
for (String destination : info.getDestinations()) {
170+
for (String cachedDestination : this.subscriptionsByDestination.keySet()) {
171+
if (this.pathMatcher.match(destination, cachedDestination)) {
172+
MultiValueMap<String, String> map = this.subscriptionsByDestination.get(cachedDestination);
173+
map.remove(info.getSessionId());
174+
if (map.isEmpty()) {
175+
this.subscriptionsByDestination.remove(cachedDestination);
176+
}
166177
}
167178
}
168179
}
169180
}
170-
return result;
181+
}
182+
183+
public MultiValueMap<String, String> getSubscriptions(String destination) {
184+
return this.subscriptionsByDestination.get(destination);
185+
}
186+
187+
public boolean isCachedDestination(String destination) {
188+
return subscriptionsByDestination.containsKey(destination);
171189
}
172190

173191
@Override

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,7 +25,6 @@
2525
import org.springframework.messaging.Message;
2626
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
2727
import org.springframework.messaging.simp.SimpMessageType;
28-
import org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry;
2928
import org.springframework.messaging.support.MessageBuilder;
3029
import org.springframework.util.MultiValueMap;
3130

@@ -35,6 +34,7 @@
3534
* Test fixture for {@link org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry}.
3635
*
3736
* @author Rossen Stoyanchev
37+
* @author Sebastien Deleuze
3838
*/
3939
public class DefaultSubscriptionRegistryTests {
4040

@@ -131,6 +131,64 @@ public void registerSubscriptionWithDestinationPattern() {
131131
assertEquals(Arrays.asList(subsId), actual.get(sessId));
132132
}
133133

134+
// SPR-11657
135+
136+
@Test
137+
public void registerMultipleSubscriptionsWithOneUsingDestinationPattern() {
138+
139+
String sessId1 = "sess01";
140+
String sessId2 = "sess02";
141+
142+
String destPatternIbm = "/topic/PRICE.STOCK.*.IBM";
143+
String destNasdaqIbm = "/topic/PRICE.STOCK.NASDAQ.IBM";
144+
String destNyseIdm = "/topic/PRICE.STOCK.NYSE.IBM";
145+
String destNasdaqGoogle = "/topic/PRICE.STOCK.NASDAQ.GOOG";
146+
147+
String sessId1ToDestPatternIbm = "subs01";
148+
String sessId1ToDestNasdaqIbm = "subs02";
149+
String sessId2TodestNasdaqIbm = "subs03";
150+
String sessId2ToDestNyseIdm = "subs04";
151+
String sessId2ToDestNasdaqGoogle = "subs05";
152+
153+
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestNasdaqIbm, destNasdaqIbm));
154+
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestPatternIbm, destPatternIbm));
155+
MultiValueMap<String, String> actual = this.registry.findSubscriptions(message(destNasdaqIbm));
156+
assertEquals("Expected 1 elements " + actual, 1, actual.size());
157+
assertEquals(Arrays.asList(sessId1ToDestNasdaqIbm, sessId1ToDestPatternIbm), actual.get(sessId1));
158+
159+
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2TodestNasdaqIbm, destNasdaqIbm));
160+
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2ToDestNyseIdm, destNyseIdm));
161+
this.registry.registerSubscription(subscribeMessage(sessId2, sessId2ToDestNasdaqGoogle, destNasdaqGoogle));
162+
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
163+
assertEquals("Expected 2 elements " + actual, 2, actual.size());
164+
assertEquals(Arrays.asList(sessId1ToDestNasdaqIbm, sessId1ToDestPatternIbm), actual.get(sessId1));
165+
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
166+
167+
this.registry.unregisterAllSubscriptions(sessId1);
168+
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
169+
assertEquals("Expected 1 elements " + actual, 1, actual.size());
170+
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
171+
172+
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestPatternIbm, destPatternIbm));
173+
this.registry.registerSubscription(subscribeMessage(sessId1, sessId1ToDestNasdaqIbm, destNasdaqIbm));
174+
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
175+
assertEquals("Expected 2 elements " + actual, 2, actual.size());
176+
assertEquals(Arrays.asList(sessId1ToDestPatternIbm, sessId1ToDestNasdaqIbm), actual.get(sessId1));
177+
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
178+
179+
this.registry.unregisterSubscription(unsubscribeMessage(sessId1, sessId1ToDestNasdaqIbm));
180+
actual = this.registry.findSubscriptions(message(destNasdaqIbm));
181+
assertEquals("Expected 2 elements " + actual, 2, actual.size());
182+
assertEquals(Arrays.asList(sessId1ToDestPatternIbm), actual.get(sessId1));
183+
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
184+
this.registry.unregisterSubscription(unsubscribeMessage(sessId1, sessId1ToDestPatternIbm));
185+
assertEquals("Expected 1 elements " + actual, 1, actual.size());
186+
assertEquals(Arrays.asList(sessId2TodestNasdaqIbm), actual.get(sessId2));
187+
188+
this.registry.unregisterSubscription(unsubscribeMessage(sessId2, sessId2TodestNasdaqIbm));
189+
assertEquals("Expected 0 element " + actual, 0, actual.size());
190+
}
191+
134192
@Test
135193
public void registerSubscriptionWithDestinationPatternRegex() {
136194

0 commit comments

Comments
 (0)