1313import java .util .UUID ;
1414import java .util .concurrent .CopyOnWriteArrayList ;
1515
16+ import rx .Completable ;
1617import rx .Observable ;
1718import rx .Subscriber ;
1819import rx .Subscription ;
1920import rx .observables .ConnectableObservable ;
21+ import rx .subjects .PublishSubject ;
2022import ua .naiksoftware .stomp .ConnectionProvider ;
2123import ua .naiksoftware .stomp .LifecycleEvent ;
2224import ua .naiksoftware .stomp .StompHeader ;
@@ -31,17 +33,22 @@ public class StompClient {
3133 public static final String SUPPORTED_VERSIONS = "1.1,1.0" ;
3234 public static final String DEFAULT_ACK = "auto" ;
3335
36+ /*
3437 private Subscription mMessagesSubscription;
3538 private Map<String, Set<Subscriber<? super StompMessage>>> mSubscribers = new HashMap<>();
36- private List <ConnectableObservable <Void >> mWaitConnectionObservables ;
39+ */
40+ private List <Completable > mWaitConnectionCompletables ;
3741 private final ConnectionProvider mConnectionProvider ;
3842 private HashMap <String , String > mTopics ;
3943 private boolean mConnected ;
4044 private boolean isConnecting ;
4145
46+ private PublishSubject <StompMessage > mMessageStream ;
47+
4248 public StompClient (ConnectionProvider connectionProvider ) {
4349 mConnectionProvider = connectionProvider ;
44- mWaitConnectionObservables = new CopyOnWriteArrayList <>();
50+ mWaitConnectionCompletables = new CopyOnWriteArrayList <>();
51+ mMessageStream = PublishSubject .create ();
4552 }
4653
4754 /**
@@ -96,46 +103,43 @@ public void connect(List<StompHeader> _headers, boolean reconnect) {
96103 });
97104
98105 isConnecting = true ;
99- mMessagesSubscription = mConnectionProvider .messages ()
106+ mConnectionProvider .messages ()
100107 .map (StompMessage ::from )
108+ .doOnNext (this ::callSubscribers )
109+ .filter (msg -> msg .getStompCommand ().equals (StompCommand .CONNECTED ))
101110 .subscribe (stompMessage -> {
102- if (stompMessage .getStompCommand ().equals (StompCommand .CONNECTED )) {
103- mConnected = true ;
104- isConnecting = false ;
105- for (ConnectableObservable <Void > observable : mWaitConnectionObservables ) {
106- observable .connect ();
107- }
108- mWaitConnectionObservables .clear ();
111+ mConnected = true ;
112+ isConnecting = false ;
113+ for (Completable completable : mWaitConnectionCompletables ) {
114+ completable .subscribe ();
109115 }
110- callSubscribers ( stompMessage );
116+ mWaitConnectionCompletables . clear ( );
111117 });
112118 }
113119
114- public Observable < Void > send (String destination ) {
120+ public Completable send (String destination ) {
115121 return send (new StompMessage (
116122 StompCommand .SEND ,
117123 Collections .singletonList (new StompHeader (StompHeader .DESTINATION , destination )),
118124 null ));
119125 }
120126
121- public Observable < Void > send (String destination , String data ) {
127+ public Completable send (String destination , String data ) {
122128 return send (new StompMessage (
123129 StompCommand .SEND ,
124130 Collections .singletonList (new StompHeader (StompHeader .DESTINATION , destination )),
125131 data ));
126132 }
127133
128- public Observable < Void > send (StompMessage stompMessage ) {
129- Observable < Void > observable = mConnectionProvider .send (stompMessage .compile ()). toObservable ( );
134+ public Completable send (StompMessage stompMessage ) {
135+ Completable completable = mConnectionProvider .send (stompMessage .compile ());
130136 if (!mConnected ) {
131- ConnectableObservable <Void > deferred = observable .publish ();
132- mWaitConnectionObservables .add (deferred );
133- return deferred ;
134- } else {
135- return observable ;
137+ mWaitConnectionCompletables .add (completable );
136138 }
139+ return completable ;
137140 }
138141
142+ /*
139143 private void callSubscribers(StompMessage stompMessage) {
140144 String messageDestination = stompMessage.findHeader(StompHeader.DESTINATION);
141145 for (String dest : mSubscribers.keySet()) {
@@ -147,6 +151,11 @@ private void callSubscribers(StompMessage stompMessage) {
147151 }
148152 }
149153 }
154+ */
155+
156+ private void callSubscribers (StompMessage stompMessage ) {
157+ mMessageStream .onNext (stompMessage );
158+ }
150159
151160 public Observable <LifecycleEvent > lifecycle () {
152161 return mConnectionProvider .getLifecycleReceiver ();
@@ -160,53 +169,95 @@ public Observable<StompMessage> topic(String destinationPath) {
160169 return topic (destinationPath , null );
161170 }
162171
172+ public Observable <StompMessage > topic (String destPath , List <StompHeader > headerList ) {
173+ Observable <StompMessage > ret ;
174+
175+ if (destPath == null )
176+ ret = Observable .error (new IllegalArgumentException ("Topic path cannot be null" ));
177+ else
178+ ret = mMessageStream
179+ .filter (msg -> destPath .equals (msg .findHeader (StompHeader .DESTINATION )))
180+ .doOnSubscribe (() -> subscribePath (destPath , headerList ));
181+ // still need to figure out how to do the unsubscribes reactively... more difficult than it sounds
182+ return ret ;
183+ }
184+
185+ /*
163186 public Observable<StompMessage> topic(String destinationPath, List<StompHeader> headerList) {
187+ // basically:
188+ // on SUBSCRIBE, add the observer to the Set in the mSubscribers map that's associated with the specified topic,
189+ // and send a subscribe message IF WE HAVEN'T ALREADY SUBSCRIBED TO THE TOPIC
190+ //
191+ // on UNSUBSCRIBE, remove unsubscribed observers, and remove unobserved topics
192+
193+ // on observer subscribe...
164194 return Observable.<StompMessage>create(subscriber -> {
195+ // get list of other subscribers to topic
165196 Set<Subscriber<? super StompMessage>> subscribersSet = mSubscribers.get(destinationPath);
197+ // if there are no other subscribers on topic...
166198 if (subscribersSet == null) {
199+ // create new subscriber list,
167200 subscribersSet = new HashSet<>();
201+ // and add the list to the map
168202 mSubscribers.put(destinationPath, subscribersSet);
203+ // send SUBSCRIBE message and add topic to mTopics
169204 subscribePath(destinationPath, headerList).subscribe();
170205 }
206+ // finally, now that we know that there is a list for this topic, add observer to it
171207 subscribersSet.add(subscriber);
172208
173209 }).doOnUnsubscribe(() -> {
210+ // on unsubscribe...
174211 Iterator<String> mapIterator = mSubscribers.keySet().iterator();
212+ // for each topic in the map,
175213 while (mapIterator.hasNext()) {
214+ // get topic path
176215 String destinationUrl = mapIterator.next();
216+ // get observers subscribed to this topic
177217 Set<Subscriber<? super StompMessage>> set = mSubscribers.get(destinationUrl);
178218 Iterator<Subscriber<? super StompMessage>> setIterator = set.iterator();
219+ // for each observer subscribed to this topic,
179220 while (setIterator.hasNext()) {
180221 Subscriber<? super StompMessage> subscriber = setIterator.next();
222+ // if observer is no longer subscribed,
181223 if (subscriber.isUnsubscribed()) {
224+ // remove it from the set
182225 setIterator.remove();
226+ // if there are no observers subscribed to this topic anymore...
183227 if (set.size() < 1) {
228+ // remote the set from the map
184229 mapIterator.remove();
230+ // send UNSUBSCRIBE message
185231 unsubscribePath(destinationUrl).subscribe();
186232 }
187233 }
188234 }
189235 }
190236 });
191237 }
238+ */
239+
240+ private Completable subscribePath (String destinationPath , List <StompHeader > headerList ) {
241+ String topicId = UUID .randomUUID ().toString ();
192242
193- private Observable <Void > subscribePath (String destinationPath , List <StompHeader > headerList ) {
194- if (destinationPath == null ) return Observable .empty ();
195- String topicId = UUID .randomUUID ().toString ();
243+ if (mTopics == null ) mTopics = new HashMap <>();
196244
197- if (mTopics == null ) mTopics = new HashMap <>();
198- mTopics .put (destinationPath , topicId );
199- List <StompHeader > headers = new ArrayList <>();
200- headers .add (new StompHeader (StompHeader .ID , topicId ));
201- headers .add (new StompHeader (StompHeader .DESTINATION , destinationPath ));
202- headers .add (new StompHeader (StompHeader .ACK , DEFAULT_ACK ));
203- if (headerList != null ) headers .addAll (headerList );
204- return send (new StompMessage (StompCommand .SUBSCRIBE ,
205- headers , null ));
206- }
245+ // Only continue if we don't already have a subscription to the topic
246+ if (mTopics .containsKey (destinationPath ))
247+ return Completable .complete ();
248+
249+ mTopics .put (destinationPath , topicId );
250+ List <StompHeader > headers = new ArrayList <>();
251+ headers .add (new StompHeader (StompHeader .ID , topicId ));
252+ headers .add (new StompHeader (StompHeader .DESTINATION , destinationPath ));
253+ headers .add (new StompHeader (StompHeader .ACK , DEFAULT_ACK ));
254+ if (headerList != null ) headers .addAll (headerList );
255+ return send (new StompMessage (StompCommand .SUBSCRIBE ,
256+ headers , null ));
257+ }
207258
208259
209- private Observable < Void > unsubscribePath (String dest ) {
260+ private Completable unsubscribePath (String dest ) {
210261 String topicId = mTopics .get (dest );
211262 Log .d (TAG , "Unsubscribe path: " + dest + " id: " + topicId );
212263
0 commit comments