@@ -29,6 +29,7 @@ public class StompClient {
2929 public static final String SUPPORTED_VERSIONS = "1.1,1.0" ;
3030 public static final String DEFAULT_ACK = "auto" ;
3131
32+ private final String tag = StompClient .class .getSimpleName ();
3233 private final ConnectionProvider mConnectionProvider ;
3334 private HashMap <String , String > mTopics ;
3435 private boolean mConnected ;
@@ -42,9 +43,13 @@ public class StompClient {
4243 public StompClient (ConnectionProvider connectionProvider ) {
4344 mConnectionProvider = connectionProvider ;
4445 mMessageStream = PublishSubject .create ();
46+ mStreamMap = new HashMap <>();
47+ resetStatus ();
48+ }
49+
50+ private void resetStatus () {
4551 mConnectionFuture = new CompletableFuture <>();
4652 mConnectionComplete = Completable .fromFuture (mConnectionFuture ).subscribeOn (Schedulers .newThread ());
47- mStreamMap = new HashMap <>();
4853 }
4954
5055 /**
@@ -61,7 +66,7 @@ public void connect(boolean reconnect) {
6166 /**
6267 * Connect without reconnect if connected
6368 *
64- * @param _headers might be null
69+ * @param _headers HTTP headers to send in the INITIAL REQUEST, i.e. during the protocol upgrade
6570 */
6671 public void connect (List <StompHeader > _headers ) {
6772 connect (_headers , false );
@@ -70,7 +75,7 @@ public void connect(List<StompHeader> _headers) {
7075 /**
7176 * If already connected and reconnect=false - nope
7277 *
73- * @param _headers might be null
78+ * @param _headers HTTP headers to send in the INITIAL REQUEST, i.e. during the protocol upgrade
7479 */
7580 public void connect (@ Nullable List <StompHeader > _headers , boolean reconnect ) {
7681 if (reconnect ) disconnect ();
@@ -123,7 +128,6 @@ public Completable send(String destination, String data) {
123128
124129 public Completable send (@ NonNull StompMessage stompMessage ) {
125130 Completable completable = mConnectionProvider .send (stompMessage .compile ());
126- mConnectionComplete .subscribe ();
127131 return completable .startWith (mConnectionComplete );
128132 }
129133
@@ -136,7 +140,8 @@ public Observable<LifecycleEvent> lifecycle() {
136140 }
137141
138142 public void disconnect () {
139- mConnectionProvider .disconnect ().subscribe ();
143+ resetStatus ();
144+ mConnectionProvider .disconnect ().subscribe (() -> mConnected = false );
140145 }
141146
142147 public Observable <StompMessage > topic (String destinationPath ) {
0 commit comments