33import android .util .Log ;
44
55import java .util .ArrayList ;
6- import java .util .Arrays ;
76import java .util .Collections ;
87import java .util .HashMap ;
98import java .util .HashSet ;
@@ -122,10 +121,6 @@ public Observable<Void> send(StompMessage stompMessage) {
122121 mWaitConnectionObservables .add (deffered );
123122 return deffered ;
124123 } else {
125- if (stompMessage .getStompCommand ().equals (StompCommand .SUBSCRIBE )){
126- ConnectableObservable <Void > send = mConnectionProvider .send (stompMessage .compile ()).publish ();
127- send .connect ();
128- }
129124 return observable ;
130125 }
131126 }
@@ -161,7 +156,7 @@ public Observable<StompMessage> topic(String destinationPath, List<StompHeader>
161156 if (subscribersSet == null ) {
162157 subscribersSet = new HashSet <>();
163158 mSubscribers .put (destinationPath , subscribersSet );
164- subscribePath (destinationPath , headerList );
159+ subscribePath (destinationPath , headerList ). subscribe () ;
165160 }
166161 subscribersSet .add (subscriber );
167162
@@ -173,16 +168,16 @@ public Observable<StompMessage> topic(String destinationPath, List<StompHeader>
173168 set .remove (subscriber );
174169 if (set .size () < 1 ) {
175170 mSubscribers .remove (dest );
176- unsubscribePath (dest );
171+ unsubscribePath (dest ). subscribe () ;
177172 }
178173 }
179174 }
180175 }
181176 });
182177 }
183178
184- private void subscribePath (String destinationPath , List <StompHeader > headerList ) {
185- if (destinationPath == null ) return ;
179+ private Observable < Void > subscribePath (String destinationPath , List <StompHeader > headerList ) {
180+ if (destinationPath == null ) return null ;
186181 String topicId = UUID .randomUUID ().toString ();
187182
188183 if (mTopics == null ) mTopics = new HashMap <>();
@@ -192,16 +187,16 @@ private void subscribePath(String destinationPath, List<StompHeader> headerList)
192187 headers .add (new StompHeader (StompHeader .DESTINATION , destinationPath ));
193188 headers .add (new StompHeader (StompHeader .ACK , DEFAULT_ACK ));
194189 if (headerList != null ) headers .addAll (headerList );
195- send (new StompMessage (StompCommand .SUBSCRIBE ,
190+ return send (new StompMessage (StompCommand .SUBSCRIBE ,
196191 headers , null ));
197192 }
198193
199194
200- private void unsubscribePath (String dest ) {
195+ private Observable < Void > unsubscribePath (String dest ) {
201196 String topicId = mTopics .get (dest );
202197 Log .d (TAG , "Unsubscribe path: " + dest + " id: " + topicId );
203198
204- send (new StompMessage (StompCommand .UNSUBSCRIBE ,
199+ return send (new StompMessage (StompCommand .UNSUBSCRIBE ,
205200 Collections .singletonList (new StompHeader (StompHeader .ID , topicId )), null ));
206201 }
207202
0 commit comments