|
6 | 6 | import java.util.Collections; |
7 | 7 | import java.util.HashMap; |
8 | 8 | import java.util.HashSet; |
| 9 | +import java.util.Iterator; |
9 | 10 | import java.util.List; |
10 | 11 | import java.util.Map; |
11 | 12 | import java.util.Set; |
|
15 | 16 | import rx.Subscriber; |
16 | 17 | import rx.Subscription; |
17 | 18 | import rx.observables.ConnectableObservable; |
18 | | -import rx.schedulers.Schedulers; |
19 | 19 | import ua.naiksoftware.stomp.ConnectionProvider; |
20 | 20 | import ua.naiksoftware.stomp.LifecycleEvent; |
21 | 21 | import ua.naiksoftware.stomp.StompHeader; |
@@ -171,14 +171,18 @@ public Observable<StompMessage> topic(String destinationPath, List<StompHeader> |
171 | 171 | subscribersSet.add(subscriber); |
172 | 172 |
|
173 | 173 | }).doOnUnsubscribe(() -> { |
174 | | - for (String dest : mSubscribers.keySet()) { |
175 | | - Set<Subscriber<? super StompMessage>> set = mSubscribers.get(dest); |
176 | | - for (Subscriber<? super StompMessage> subscriber : set) { |
| 174 | + Iterator<String> mapIterator = mSubscribers.keySet().iterator(); |
| 175 | + while (mapIterator.hasNext()) { |
| 176 | + String destinationUrl = mapIterator.next(); |
| 177 | + Set<Subscriber<? super StompMessage>> set = mSubscribers.get(destinationUrl); |
| 178 | + Iterator<Subscriber<? super StompMessage>> setIterator = set.iterator(); |
| 179 | + while (setIterator.hasNext()) { |
| 180 | + Subscriber<? super StompMessage> subscriber = setIterator.next(); |
177 | 181 | if (subscriber.isUnsubscribed()) { |
178 | | - set.remove(subscriber); |
| 182 | + setIterator.remove(); |
179 | 183 | if (set.size() < 1) { |
180 | | - mSubscribers.remove(dest); |
181 | | - unsubscribePath(dest).subscribe(); |
| 184 | + mapIterator.remove(); |
| 185 | + unsubscribePath(destinationUrl).subscribe(); |
182 | 186 | } |
183 | 187 | } |
184 | 188 | } |
|
0 commit comments