Skip to content

Commit 98e578a

Browse files
Added disconnect() to interface, cleaned up some Rx methods
1 parent 1952ea1 commit 98e578a

File tree

4 files changed

+66
-11
lines changed

4 files changed

+66
-11
lines changed

lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package ua.naiksoftware.stomp;
22

3+
import rx.Completable;
34
import rx.Observable;
45

56
/**
@@ -23,4 +24,10 @@ public interface ConnectionProvider {
2324
* Subscribe this for receive #LifecycleEvent events
2425
*/
2526
Observable<LifecycleEvent> getLifecycleReceiver();
27+
28+
/**
29+
* Disconnects from server. This is basically a Callable.
30+
* Automatically emits Lifecycle.CLOSE
31+
*/
32+
Completable disconnect();
2633
}

lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
import okhttp3.WebSocket;
1717
import okhttp3.WebSocketListener;
1818
import okio.ByteString;
19+
import rx.Completable;
1920
import rx.Observable;
2021
import rx.Subscriber;
22+
import rx.subjects.PublishSubject;
2123

2224
/* package */ class OkHttpConnectionProvider implements ConnectionProvider {
2325

@@ -28,7 +30,9 @@
2830
private final OkHttpClient mOkHttpClient;
2931

3032
private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
33+
private final PublishSubject<LifecycleEvent> mLifecycleStream;
3134
private final List<Subscriber<? super String>> mMessagesSubscribers;
35+
private final PublishSubject<String> mMessagesStream;
3236

3337
private WebSocket openedSocked;
3438

@@ -39,10 +43,20 @@
3943
mLifecycleSubscribers = new ArrayList<>();
4044
mMessagesSubscribers = new ArrayList<>();
4145
mOkHttpClient = okHttpClient;
46+
47+
mLifecycleStream = PublishSubject.create();
48+
mMessagesStream = PublishSubject.create();
4249
}
4350

4451
@Override
4552
public Observable<String> messages() {
53+
createWebSocketConnection();
54+
// By using Subjects, we can leave the tracking of Subscribers to Rx.
55+
// Additionally, server disconnection is now handled manually
56+
// (instead of trying to support disconnecting just by unsubscribing)
57+
return mMessagesStream;
58+
59+
/*
4660
Observable<String> observable = Observable.<String>create(subscriber -> {
4761
mMessagesSubscribers.add(subscriber);
4862
@@ -61,6 +75,14 @@ public Observable<String> messages() {
6175
6276
createWebSocketConnection();
6377
return observable;
78+
*/
79+
}
80+
81+
// this used to be done automatically whenever the "subscriber list" was empty
82+
// this way is more discrete
83+
@Override
84+
public Completable disconnect() {
85+
return Completable.fromAction(() -> openedSocked.close(1000, ""));
6486
}
6587

6688
private void createWebSocketConnection() {
@@ -71,9 +93,9 @@ private void createWebSocketConnection() {
7193

7294
Request.Builder requestBuilder = new Request.Builder()
7395
.url(mUri);
74-
96+
7597
addConnectionHeadersToBuilder(requestBuilder, mConnectHttpHeaders);
76-
98+
7799
openedSocked = mOkHttpClient.newWebSocket(requestBuilder.build(),
78100
new WebSocketListener() {
79101
@Override
@@ -113,19 +135,24 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
113135

114136
@Override
115137
public Observable<Void> send(String stompMessage) {
116-
return Observable.create(subscriber -> {
138+
// .create(onSubscribe) is deprecated because it's unsafe
139+
return Observable.fromCallable(() -> {
117140
if (openedSocked == null) {
118-
subscriber.onError(new IllegalStateException("Not connected yet"));
141+
throw new IllegalStateException("Not connected yet");
119142
} else {
120143
Log.d(TAG, "Send STOMP message: " + stompMessage);
121144
openedSocked.send(stompMessage);
122-
subscriber.onCompleted();
145+
return null;
123146
}
124147
});
125148
}
126149

127150
@Override
128151
public Observable<LifecycleEvent> getLifecycleReceiver() {
152+
// Once again, opting to leave Subscriber tracking to Rx
153+
return mLifecycleStream;
154+
155+
/*
129156
return Observable.<LifecycleEvent>create(subscriber -> {
130157
mLifecycleSubscribers.add(subscriber);
131158
@@ -135,6 +162,7 @@ public Observable<LifecycleEvent> getLifecycleReceiver() {
135162
if (iterator.next().isUnsubscribed()) iterator.remove();
136163
}
137164
});
165+
*/
138166
}
139167

140168
private TreeMap<String, String> headersAsMap(Response response) {
@@ -154,15 +182,24 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<S
154182

155183
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
156184
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
185+
// I know Subjects are discouraged, but I think this is way cleaner than before
186+
mLifecycleStream.onNext(lifecycleEvent);
187+
188+
/*
157189
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {
158190
subscriber.onNext(lifecycleEvent);
159191
}
192+
*/
160193
}
161194

162195
private void emitMessage(String stompMessage) {
163196
Log.d(TAG, "Emit STOMP message: " + stompMessage);
197+
mMessagesStream.onNext(stompMessage);
198+
199+
/*
164200
for (Subscriber<? super String> subscriber : mMessagesSubscribers) {
165201
subscriber.onNext(stompMessage);
166202
}
203+
*/
167204
}
168205
}

lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import javax.net.ssl.SSLContext;
2121
import javax.net.ssl.SSLSocketFactory;
2222

23+
import rx.Completable;
2324
import rx.Observable;
2425
import rx.Subscriber;
2526

@@ -145,6 +146,14 @@ public Observable<Void> send(String stompMessage) {
145146
});
146147
}
147148

149+
// Just to appease javac
150+
@Override
151+
public Completable disconnect() {
152+
return Completable.fromAction(() -> {
153+
throw new UnsupportedOperationException("JAVA WEB SOCKETS ARE NOT YET SUPPORTED IN THIS VERSION");
154+
});
155+
}
156+
148157
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
149158
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
150159
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {

lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public void connect(List<StompHeader> _headers) {
7272
public void connect(List<StompHeader> _headers, boolean reconnect) {
7373
if (reconnect) disconnect();
7474
if (mConnected) return;
75-
mConnectionProvider.getLifecycleReceiver()
75+
// why wasn't this DRY before?
76+
lifecycle()
7677
.subscribe(lifecycleEvent -> {
7778
switch (lifecycleEvent.getType()) {
7879
case OPENED:
@@ -128,9 +129,10 @@ public Observable<Void> send(String destination, String data) {
128129
public Observable<Void> send(StompMessage stompMessage) {
129130
Observable<Void> observable = mConnectionProvider.send(stompMessage.compile());
130131
if (!mConnected) {
131-
ConnectableObservable<Void> deffered = observable.publish();
132-
mWaitConnectionObservables.add(deffered);
133-
return deffered;
132+
// my inner grammar nazi
133+
ConnectableObservable<Void> deferred = observable.publish();
134+
mWaitConnectionObservables.add(deferred);
135+
return deferred;
134136
} else {
135137
return observable;
136138
}
@@ -153,8 +155,8 @@ public Observable<LifecycleEvent> lifecycle() {
153155
}
154156

155157
public void disconnect() {
156-
if (mMessagesSubscription != null) mMessagesSubscription.unsubscribe();
157-
mConnected = false;
158+
// the other things are now taken care of downstream
159+
mConnectionProvider.disconnect().subscribe();
158160
}
159161

160162
public Observable<StompMessage> topic(String destinationPath) {

0 commit comments

Comments
 (0)