1313import java .util .UUID ;
1414import java .util .concurrent .CopyOnWriteArrayList ;
1515
16+ import java8 .util .concurrent .CompletableFuture ;
1617import rx .Completable ;
1718import rx .Observable ;
1819import rx .Subscriber ;
1920import rx .Subscription ;
2021import rx .observables .ConnectableObservable ;
22+ import rx .schedulers .Schedulers ;
2123import rx .subjects .PublishSubject ;
2224import ua .naiksoftware .stomp .ConnectionProvider ;
2325import ua .naiksoftware .stomp .LifecycleEvent ;
@@ -37,18 +39,23 @@ public class StompClient {
3739 private Subscription mMessagesSubscription;
3840 private Map<String, Set<Subscriber<? super StompMessage>>> mSubscribers = new HashMap<>();
3941 */
40- private List <Completable > mWaitConnectionCompletables ;
42+ // private List<Completable> mWaitConnectionCompletables;
4143 private final ConnectionProvider mConnectionProvider ;
4244 private HashMap <String , String > mTopics ;
4345 private boolean mConnected ;
4446 private boolean isConnecting ;
4547
4648 private PublishSubject <StompMessage > mMessageStream ;
49+ private CompletableFuture <Boolean > connectionStatus ;
50+ private Completable waitForConnect ;
4751
4852 public StompClient (ConnectionProvider connectionProvider ) {
4953 mConnectionProvider = connectionProvider ;
50- mWaitConnectionCompletables = new CopyOnWriteArrayList <>();
54+ // mWaitConnectionCompletables = new CopyOnWriteArrayList<>();
5155 mMessageStream = PublishSubject .create ();
56+ connectionStatus = new CompletableFuture <>();
57+ waitForConnect = Completable .fromFuture (connectionStatus ).subscribeOn (Schedulers .newThread ());
58+ waitForConnect .subscribe (() -> Log .d (TAG , "waitForConnect completed" ));
5259 }
5360
5461 /**
@@ -87,7 +94,7 @@ public void connect(List<StompHeader> _headers, boolean reconnect) {
8794 headers .add (new StompHeader (StompHeader .VERSION , SUPPORTED_VERSIONS ));
8895 if (_headers != null ) headers .addAll (_headers );
8996 mConnectionProvider .send (new StompMessage (StompCommand .CONNECT , headers , null ).compile ())
90- .subscribe ();
97+ .subscribe (() -> Log . d ( TAG , "CONNECT command sent!" ) );
9198 break ;
9299
93100 case CLOSED :
@@ -110,18 +117,18 @@ public void connect(List<StompHeader> _headers, boolean reconnect) {
110117 .subscribe (stompMessage -> {
111118 mConnected = true ;
112119 isConnecting = false ;
120+ connectionStatus .complete (true );
121+ /*
113122 for (Completable completable : mWaitConnectionCompletables) {
114123 completable.subscribe();
115124 }
116125 mWaitConnectionCompletables.clear();
126+ */
117127 });
118128 }
119129
120130 public Completable send (String destination ) {
121- return send (new StompMessage (
122- StompCommand .SEND ,
123- Collections .singletonList (new StompHeader (StompHeader .DESTINATION , destination )),
124- null ));
131+ return send (destination , null );
125132 }
126133
127134 public Completable send (String destination , String data ) {
@@ -133,10 +140,13 @@ public Completable send(String destination, String data) {
133140
134141 public Completable send (StompMessage stompMessage ) {
135142 Completable completable = mConnectionProvider .send (stompMessage .compile ());
143+ /*
136144 if (!mConnected) {
137145 mWaitConnectionCompletables.add(completable);
138146 }
139- return completable ;
147+ */
148+ waitForConnect .subscribe (() -> Log .d (TAG , "SEND waitForConnect complete, continuing!" ));
149+ return completable .startWith (waitForConnect );
140150 }
141151
142152 /*
@@ -177,7 +187,7 @@ public Observable<StompMessage> topic(String destPath, List<StompHeader> headerL
177187 else
178188 ret = mMessageStream
179189 .filter (msg -> destPath .equals (msg .findHeader (StompHeader .DESTINATION )))
180- .doOnSubscribe (() -> subscribePath (destPath , headerList ));
190+ .doOnSubscribe (() -> subscribePath (destPath , headerList ). subscribe () );
181191 // still need to figure out how to do the unsubscribes reactively... more difficult than it sounds
182192 return ret ;
183193 }
0 commit comments