1111import java .util .Map ;
1212import java .util .Set ;
1313import java .util .UUID ;
14+ import java .util .concurrent .ConcurrentHashMap ;
1415import java .util .concurrent .CopyOnWriteArrayList ;
1516
1617import io .reactivex .BackpressureStrategy ;
@@ -33,7 +34,7 @@ public class StompClient {
3334 public static final String DEFAULT_ACK = "auto" ;
3435
3536 private Disposable mMessagesDisposable ;
36- private Map <String , Set <FlowableEmitter <? super StompMessage >>> mEmitters = new HashMap <>();
37+ private Map <String , Set <FlowableEmitter <? super StompMessage >>> mEmitters = new ConcurrentHashMap <>();
3738 private List <ConnectableFlowable <Void >> mWaitConnectionFlowables ;
3839 private final ConnectionProvider mConnectionProvider ;
3940 private HashMap <String , String > mTopics ;
@@ -177,16 +178,14 @@ public Flowable<StompMessage> topic(String destinationPath, List<StompHeader> he
177178 while (mapIterator .hasNext ()) {
178179 String destinationUrl = mapIterator .next ();
179180 Set <FlowableEmitter <? super StompMessage >> set = mEmitters .get (destinationUrl );
180- if (null != set ) {
181- Iterator <FlowableEmitter <? super StompMessage >> setIterator = set .iterator ();
182- while (setIterator .hasNext ()) {
183- FlowableEmitter <? super StompMessage > subscriber = setIterator .next ();
184- if (subscriber .isCancelled ()) {
185- setIterator .remove ();
186- if (set .size () < 1 ) {
187- mapIterator .remove ();
188- unsubscribePath (destinationUrl ).subscribe ();
189- }
181+ Iterator <FlowableEmitter <? super StompMessage >> setIterator = set .iterator ();
182+ while (setIterator .hasNext ()) {
183+ FlowableEmitter <? super StompMessage > subscriber = setIterator .next ();
184+ if (subscriber .isCancelled ()) {
185+ setIterator .remove ();
186+ if (set .size () < 1 ) {
187+ mapIterator .remove ();
188+ unsubscribePath (destinationUrl ).subscribe ();
190189 }
191190 }
192191 }
0 commit comments