Skip to content

Commit 96da77e

Browse files
sdeleuzerstoyanchev
authored andcommitted
Avoid ConcurrentModificationException
Removal of cached destination is now moved outside the for loop that removes subscriptions to avoid ConcurrentModificationException. Also since updateCache is a LinkedHashMap with accessOrder=true, a simple access with updateCache.get() modify the map. By iterating over updateCache.entrySet(), we avoid this update. Issue: SPR-11755
1 parent c9417d0 commit 96da77e

File tree

2 files changed

+61
-16
lines changed

2 files changed

+61
-16
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
5555
private PathMatcher pathMatcher = new AntPathMatcher();
5656

5757

58-
5958
/**
6059
* Specify the maximum number of entries for the resolved destination cache.
6160
* Default is 1024.
@@ -115,7 +114,7 @@ public void unregisterAllSubscriptions(String sessionId) {
115114

116115
@Override
117116
protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
118-
MultiValueMap<String,String> result = this.destinationCache.getSubscriptions(destination);
117+
MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination);
119118
if (result != null) {
120119
return result;
121120
}
@@ -129,7 +128,7 @@ protected MultiValueMap<String, String> findSubscriptionsInternal(String destina
129128
}
130129
}
131130
}
132-
if(!result.isEmpty()) {
131+
if (!result.isEmpty()) {
133132
this.destinationCache.addSubscriptions(destination, result);
134133
}
135134
return result;
@@ -175,10 +174,11 @@ public void addSubscriptions(String destination, MultiValueMap<String, String> s
175174
}
176175

177176
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
178-
synchronized(this.updateCache) {
179-
for (String cachedDestination : this.updateCache.keySet()) {
177+
synchronized (this.updateCache) {
178+
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
179+
String cachedDestination = entry.getKey();
180180
if (getPathMatcher().match(destination, cachedDestination)) {
181-
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
181+
MultiValueMap<String, String> subs = entry.getValue();
182182
subs.add(sessionId, subsId);
183183
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
184184
}
@@ -187,43 +187,53 @@ public void updateAfterNewSubscription(String destination, String sessionId, Str
187187
}
188188

189189
public void updateAfterRemovedSubscription(String destination, String sessionId, String subsId) {
190-
synchronized(this.updateCache) {
191-
for (String cachedDestination : this.updateCache.keySet()) {
190+
synchronized (this.updateCache) {
191+
Set<String> destinationsToRemove = new HashSet<String>();
192+
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
193+
String cachedDestination = entry.getKey();
192194
if (getPathMatcher().match(destination, cachedDestination)) {
193-
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
195+
MultiValueMap<String, String> subs = entry.getValue();
194196
List<String> subsIds = subs.get(sessionId);
195197
subsIds.remove(subsId);
196198
if (subsIds.isEmpty()) {
197199
subs.remove(sessionId);
198200
}
199201
if (subs.isEmpty()) {
200-
this.updateCache.remove(cachedDestination);
201-
this.accessCache.remove(cachedDestination);
202+
destinationsToRemove.add(cachedDestination);
202203
}
203204
else {
204205
this.accessCache.put(cachedDestination, new LinkedMultiValueMap<String, String>(subs));
205206
}
206207
}
207208
}
209+
for (String d : destinationsToRemove) {
210+
this.updateCache.remove(d);
211+
this.accessCache.remove(d);
212+
}
208213
}
209214
}
210215

211216
public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
212-
synchronized(this.updateCache) {
217+
synchronized (this.updateCache) {
213218
for (String destination : info.getDestinations()) {
214-
for (String cachedDestination : this.updateCache.keySet()) {
219+
Set<String> destinationsToRemove = new HashSet<String>();
220+
for (Map.Entry<String, MultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
221+
String cachedDestination = entry.getKey();
215222
if (getPathMatcher().match(destination, cachedDestination)) {
216-
MultiValueMap<String, String> subs = this.updateCache.get(cachedDestination);
223+
MultiValueMap<String, String> subs = entry.getValue();
217224
subs.remove(info.getSessionId());
218225
if (subs.isEmpty()) {
219-
this.updateCache.remove(cachedDestination);
220-
this.accessCache.remove(cachedDestination);
226+
destinationsToRemove.add(cachedDestination);
221227
}
222228
else {
223229
this.accessCache.put(cachedDestination,new LinkedMultiValueMap<String, String>(subs));
224230
}
225231
}
226232
}
233+
for (String d : destinationsToRemove) {
234+
this.updateCache.remove(d);
235+
this.accessCache.remove(d);
236+
}
227237
}
228238
}
229239
}

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistryTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,41 @@ public void registerSubscriptionsWithSimpleAndPatternDestinations() {
185185
assertEquals(0, actual.size());
186186
}
187187

188+
// SPR-11755
189+
190+
@Test
191+
public void registerAndUnregisterMultipleDestinations() {
192+
193+
String sess1 = "sess01";
194+
String sess2 = "sess02";
195+
196+
String subs1 = "subs01";
197+
String subs2 = "subs02";
198+
String subs3 = "subs03";
199+
String subs4 = "subs04";
200+
String subs5 = "subs05";
201+
202+
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM"));
203+
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NYSE.IBM"));
204+
this.registry.registerSubscription(subscribeMessage(sess1, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
205+
206+
this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NYSE.IBM"));
207+
this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.GOOG"));
208+
this.registry.findSubscriptions(message("/topic/PRICE.STOCK.NASDAQ.IBM"));
209+
210+
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs1));
211+
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs2));
212+
this.registry.unregisterSubscription(unsubscribeMessage(sess1, subs3));
213+
214+
this.registry.registerSubscription(subscribeMessage(sess1, subs1, "/topic/PRICE.STOCK.NASDAQ.IBM"));
215+
this.registry.registerSubscription(subscribeMessage(sess1, subs2, "/topic/PRICE.STOCK.NYSE.IBM"));
216+
this.registry.registerSubscription(subscribeMessage(sess1, subs3, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
217+
this.registry.registerSubscription(subscribeMessage(sess1, subs4, "/topic/PRICE.STOCK.NYSE.IBM"));
218+
this.registry.registerSubscription(subscribeMessage(sess2, subs5, "/topic/PRICE.STOCK.NASDAQ.GOOG"));
219+
this.registry.unregisterAllSubscriptions(sess1);
220+
this.registry.unregisterAllSubscriptions(sess2);
221+
}
222+
188223
@Test
189224
public void registerSubscriptionWithDestinationPatternRegex() {
190225

0 commit comments

Comments
 (0)