Skip to content

Commit 8a41770

Browse files
Now we have Topic Unsubscribe working as intended.
Needs to be tested
1 parent da0be9c commit 8a41770

File tree

1 file changed

+17
-10
lines changed

1 file changed

+17
-10
lines changed

lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class StompClient {
4848
private PublishSubject<StompMessage> mMessageStream;
4949
private CompletableFuture<Boolean> connectionStatus;
5050
private Completable waitForConnect;
51+
private HashMap<String, Observable<StompMessage>> msgStreams;
5152

5253
public StompClient(ConnectionProvider connectionProvider) {
5354
mConnectionProvider = connectionProvider;
@@ -56,6 +57,7 @@ public StompClient(ConnectionProvider connectionProvider) {
5657
connectionStatus = new CompletableFuture<>();
5758
waitForConnect = Completable.fromFuture(connectionStatus).subscribeOn(Schedulers.newThread());
5859
waitForConnect.subscribe(() -> Log.d(TAG, "waitForConnect completed"));
60+
msgStreams = new HashMap<>();
5961
}
6062

6163
/**
@@ -180,16 +182,17 @@ public Observable<StompMessage> topic(String destinationPath) {
180182
}
181183

182184
public Observable<StompMessage> topic(String destPath, List<StompHeader> headerList) {
183-
Observable<StompMessage> ret;
184-
185185
if (destPath == null)
186-
ret = Observable.error(new IllegalArgumentException("Topic path cannot be null"));
187-
else
188-
ret = mMessageStream
189-
.filter(msg -> destPath.equals(msg.findHeader(StompHeader.DESTINATION)))
190-
.doOnSubscribe(() -> subscribePath(destPath, headerList).subscribe());
191-
// still need to figure out how to do the unsubscribes reactively... more difficult than it sounds
192-
return ret;
186+
return Observable.error(new IllegalArgumentException("Topic path cannot be null"));
187+
else if (!msgStreams.containsKey(destPath))
188+
msgStreams.put(destPath,
189+
mMessageStream
190+
.filter(msg -> destPath.equals(msg.findHeader(StompHeader.DESTINATION)))
191+
.doOnSubscribe(() -> subscribePath(destPath, headerList).subscribe())
192+
.doOnUnsubscribe(() -> unsubscribePath(destPath).subscribe())
193+
.share()
194+
);
195+
return msgStreams.get(destPath);
193196
}
194197

195198
/*
@@ -253,8 +256,10 @@ private Completable subscribePath(String destinationPath, List<StompHeader> head
253256
if (mTopics == null) mTopics = new HashMap<>();
254257

255258
// Only continue if we don't already have a subscription to the topic
256-
if (mTopics.containsKey(destinationPath))
259+
if (mTopics.containsKey(destinationPath)) {
260+
Log.d(TAG, "Attempted to subscribe to already-subscribed path!");
257261
return Completable.complete();
262+
}
258263

259264
mTopics.put(destinationPath, topicId);
260265
List<StompHeader> headers = new ArrayList<>();
@@ -268,6 +273,8 @@ private Completable subscribePath(String destinationPath, List<StompHeader> head
268273

269274

270275
private Completable unsubscribePath(String dest) {
276+
msgStreams.remove(dest);
277+
271278
String topicId = mTopics.get(dest);
272279
Log.d(TAG, "Unsubscribe path: " + dest + " id: " + topicId);
273280

0 commit comments

Comments
 (0)