11package ua .naiksoftware .stomp ;
22
3+ import android .annotation .SuppressLint ;
34import android .support .annotation .NonNull ;
45import android .support .annotation .Nullable ;
56import android .util .Log ;
@@ -55,7 +56,6 @@ public class StompClient {
5556
5657 public StompClient (ConnectionProvider connectionProvider ) {
5758 mConnectionProvider = connectionProvider ;
58- mMessageStream = PublishSubject .create ();
5959 mStreamMap = new ConcurrentHashMap <>();
6060 mConnectionStream = BehaviorSubject .createDefault (false );
6161 pathMatcher = new SimplePathMatcher ();
@@ -123,14 +123,11 @@ public void connect(@Nullable List<StompHeader> _headers) {
123123
124124 case CLOSED :
125125 Log .d (TAG , "Socket closed" );
126- setConnected (false );
127- isConnecting = false ;
126+ disconnect ();
128127 break ;
129128
130129 case ERROR :
131130 Log .d (TAG , "Socket closed with error" );
132- setConnected (false );
133- isConnecting = false ;
134131 break ;
135132
136133 case FAILED_SERVER_HEARTBEAT :
@@ -143,14 +140,21 @@ public void connect(@Nullable List<StompHeader> _headers) {
143140 isConnecting = true ;
144141 mMessagesDisposable = mConnectionProvider .messages ()
145142 .map (StompMessage ::from )
146- .doOnNext (this :: callSubscribers )
143+ .doOnNext (getMessageStream ():: onNext )
147144 .filter (msg -> msg .getStompCommand ().equals (StompCommand .CONNECTED ))
148145 .subscribe (stompMessage -> {
149146 setConnected (true );
150147 isConnecting = false ;
151148 });
152149 }
153150
151+ private PublishSubject <StompMessage > getMessageStream () {
152+ if (mMessageStream == null || mMessageStream .hasComplete ()) {
153+ mMessageStream = PublishSubject .create ();
154+ }
155+ return mMessageStream ;
156+ }
157+
154158 private void setConnected (boolean connected ) {
155159 mConnected = connected ;
156160 mConnectionStream .onNext (mConnected );
@@ -159,6 +163,7 @@ private void setConnected(boolean connected) {
159163 /**
160164 * Disconnect from server, and then reconnect with the last-used headers
161165 */
166+ @ SuppressLint ("CheckResult" )
162167 public void reconnect () {
163168 disconnectCompletable ()
164169 .subscribe (() -> connect (mHeaders ),
@@ -186,33 +191,33 @@ public Completable send(@NonNull StompMessage stompMessage) {
186191 .startWith (connectionComplete );
187192 }
188193
189- private void callSubscribers (StompMessage stompMessage ) {
190- mMessageStream .onNext (stompMessage );
191- }
192-
193194 public Flowable <LifecycleEvent > lifecycle () {
194195 return mConnectionProvider .lifecycle ().toFlowable (BackpressureStrategy .BUFFER );
195196 }
196197
197198 public void disconnect () {
198- disconnectCompletable ().subscribe (() -> {}, e -> Log .e (tag , "Disconnect error" , e ));
199- if (mStreamMap != null && !mStreamMap .isEmpty ()){
200- mStreamMap .clear ();
201- }
202- if (mTopics != null && !mTopics .isEmpty ()){
203- mTopics .clear ();
199+ if (!mConnected ) {
200+ Log .d (TAG , "Skip disconnect, already not connected" );
201+ return ;
204202 }
203+ disconnectCompletable ().subscribe (() -> {}, e -> Log .e (tag , "Disconnect error" , e ));
205204 }
206205
207206 public Completable disconnectCompletable () {
208- if (mLifecycleDisposable != null ) {
209- mLifecycleDisposable .dispose ();
210- }
211- if (mMessagesDisposable != null ) {
212- mMessagesDisposable .dispose ();
213- }
214207 return mConnectionProvider .disconnect ()
215- .doOnComplete (() -> setConnected (false ));
208+ .doOnComplete (() -> {
209+ setConnected (false );
210+ isConnecting = false ;
211+ if (mLifecycleDisposable != null ) {
212+ mLifecycleDisposable .dispose ();
213+ }
214+ if (mMessagesDisposable != null ) {
215+ mMessagesDisposable .dispose ();
216+ }
217+ if (mMessageStream != null && !mMessageStream .hasComplete ()) {
218+ mMessageStream .onComplete ();
219+ }
220+ });
216221 }
217222
218223 public Flowable <StompMessage > topic (String destinationPath ) {
@@ -224,7 +229,7 @@ public Flowable<StompMessage> topic(@NonNull String destPath, List<StompHeader>
224229 return Flowable .error (new IllegalArgumentException ("Topic path cannot be null" ));
225230 else if (!mStreamMap .containsKey (destPath ))
226231 mStreamMap .put (destPath ,
227- mMessageStream
232+ getMessageStream ()
228233 .filter (msg -> pathMatcher .matches (destPath , msg ))
229234 .toFlowable (BackpressureStrategy .BUFFER )
230235 .doOnSubscribe (disposable -> subscribePath (destPath , headerList ).subscribe ())
@@ -294,6 +299,11 @@ private Completable unsubscribePath(String dest) {
294299
295300 Log .d (TAG , "Unsubscribe path: " + dest + " id: " + topicId );
296301
302+ if (!mConnected ) {
303+ Log .d (TAG , "Not connected, skip sending Unsubscribe frame to " + dest );
304+ return Completable .complete ();
305+ }
306+
297307 return send (new StompMessage (StompCommand .UNSUBSCRIBE ,
298308 Collections .singletonList (new StompHeader (StompHeader .ID , topicId )), null ));
299309 }
0 commit comments