1010import java .util .UUID ;
1111import java .util .concurrent .ConcurrentHashMap ;
1212
13- import java8 .util .StringJoiner ;
14- import java8 .util .concurrent .CompletableFuture ;
15- import rx .Completable ;
16- import rx .Observable ;
17- import rx .Subscription ;
18- import rx .schedulers .Schedulers ;
19- import rx .subjects .PublishSubject ;
13+ import io .reactivex .BackpressureStrategy ;
14+ import io .reactivex .Completable ;
15+ import io .reactivex .CompletableSource ;
16+ import io .reactivex .Flowable ;
17+ import io .reactivex .disposables .Disposable ;
18+ import io .reactivex .subjects .PublishSubject ;
2019import ua .naiksoftware .stomp .ConnectionProvider ;
2120import ua .naiksoftware .stomp .LifecycleEvent ;
2221import ua .naiksoftware .stomp .StompHeader ;
@@ -39,28 +38,20 @@ public class StompClient {
3938 private boolean legacyWhitespace ;
4039
4140 private PublishSubject <StompMessage > mMessageStream ;
42- private CompletableFuture <Boolean > mConnectionFuture ;
43- private Completable mConnectionComplete ;
44- private ConcurrentHashMap <String , Observable <StompMessage >> mStreamMap ;
41+ private ConcurrentHashMap <String , Flowable <StompMessage >> mStreamMap ;
4542 private Parser parser ;
46- private Subscription lifecycleSub ;
47- private Subscription messagesSubscription ;
43+ private Disposable mLifecycleDisposable ;
44+ private Disposable mMessagesDisposable ;
4845 private List <StompHeader > mHeaders ;
4946 private int heartbeat ;
5047
5148 public StompClient (ConnectionProvider connectionProvider ) {
5249 mConnectionProvider = connectionProvider ;
5350 mMessageStream = PublishSubject .create ();
5451 mStreamMap = new ConcurrentHashMap <>();
55- resetStatus ();
5652 parser = Parser .NONE ;
5753 }
5854
59- private void resetStatus () {
60- mConnectionFuture = new CompletableFuture <>();
61- mConnectionComplete = Completable .fromFuture (mConnectionFuture ).subscribeOn (Schedulers .newThread ());
62- }
63-
6455 public enum Parser {
6556 NONE ,
6657 RABBITMQ
@@ -109,7 +100,7 @@ public void connect(@Nullable List<StompHeader> _headers) {
109100 mHeaders = _headers ;
110101
111102 if (mConnected ) return ;
112- lifecycleSub = lifecycle ()
103+ mLifecycleDisposable = mConnectionProvider . lifecycle ()
113104 .subscribe (lifecycleEvent -> {
114105 switch (lifecycleEvent .getType ()) {
115106 case OPENED :
@@ -134,14 +125,14 @@ public void connect(@Nullable List<StompHeader> _headers) {
134125 });
135126
136127 isConnecting = true ;
137- messagesSubscription = mConnectionProvider .messages ()
128+ mMessagesDisposable = mConnectionProvider .messages ()
138129 .map (StompMessage ::from )
139130 .doOnNext (this ::callSubscribers )
140131 .filter (msg -> msg .getStompCommand ().equals (StompCommand .CONNECTED ))
141132 .subscribe (stompMessage -> {
142133 mConnected = true ;
143134 isConnecting = false ;
144- mConnectionFuture .complete (true );
135+ // mConnectionFuture.complete(true);
145136 });
146137 }
147138
@@ -166,37 +157,42 @@ public Completable send(String destination, String data) {
166157
167158 public Completable send (@ NonNull StompMessage stompMessage ) {
168159 Completable completable = mConnectionProvider .send (stompMessage .compile (legacyWhitespace ));
169- return completable .startWith (mConnectionComplete );
160+ CompletableSource connectionComplete = mConnectionProvider .connected ()
161+ .filter (isConnected -> isConnected )
162+ .firstOrError ().toCompletable ();
163+ return completable
164+ .startWith (mConnectionProvider .disconnect ())
165+ .startWith (connectionComplete );
170166 }
171167
172168 private void callSubscribers (StompMessage stompMessage ) {
173169 mMessageStream .onNext (stompMessage );
174170 }
175171
176- public Observable <LifecycleEvent > lifecycle () {
177- return mConnectionProvider .getLifecycleReceiver ( );
172+ public Flowable <LifecycleEvent > lifecycle () {
173+ return mConnectionProvider .lifecycle (). toFlowable ( BackpressureStrategy . BUFFER );
178174 }
179175
180176 public void disconnect () {
181- resetStatus ();
182- lifecycleSub .unsubscribe ();
183- messagesSubscription .unsubscribe ();
177+ mLifecycleDisposable .dispose ();
178+ mMessagesDisposable .dispose ();
184179 mConnectionProvider .disconnect ().subscribe (() -> mConnected = false );
185180 }
186181
187- public Observable <StompMessage > topic (String destinationPath ) {
182+ public Flowable <StompMessage > topic (String destinationPath ) {
188183 return topic (destinationPath , null );
189184 }
190185
191- public Observable <StompMessage > topic (@ NonNull String destPath , List <StompHeader > headerList ) {
186+ public Flowable <StompMessage > topic (@ NonNull String destPath , List <StompHeader > headerList ) {
192187 if (destPath == null )
193- return Observable .error (new IllegalArgumentException ("Topic path cannot be null" ));
188+ return Flowable .error (new IllegalArgumentException ("Topic path cannot be null" ));
194189 else if (!mStreamMap .containsKey (destPath ))
195190 mStreamMap .put (destPath ,
196191 mMessageStream
197192 .filter (msg -> matches (destPath , msg ))
198- .doOnSubscribe (() -> subscribePath (destPath , headerList ).subscribe ())
199- .doOnUnsubscribe (() -> unsubscribePath (destPath ).subscribe ())
193+ .toFlowable (BackpressureStrategy .BUFFER )
194+ .doOnSubscribe (disposable -> subscribePath (destPath , headerList ).subscribe ())
195+ .doFinally (() -> unsubscribePath (destPath ).subscribe ())
200196 .share ()
201197 );
202198 return mStreamMap .get (destPath );
@@ -250,10 +246,12 @@ private boolean matches(String path, StompMessage msg) {
250246 }
251247 }
252248 // at this point, 'transformed' looks like ["lorem", "ipsum", "[^.]+", "sit"]
253- StringJoiner sj = new StringJoiner ("\\ ." );
254- for (String s : transformed )
255- sj .add (s );
256- String join = sj .toString ();
249+ StringBuilder sb = new StringBuilder ();
250+ for (String s : transformed ) {
251+ if (sb .length () > 0 ) sb .append ("\\ ." );
252+ sb .append (s );
253+ }
254+ String join = sb .toString ();
257255 // join = "lorem\.ipsum\.[^.]+\.sit"
258256
259257 ret = dest .matches (join );
0 commit comments