Skip to content

Commit 24f989f

Browse files
Cleaning up, restoring JWS support
1 parent 98e578a commit 24f989f

File tree

5 files changed

+30
-125
lines changed

5 files changed

+30
-125
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ Yes, it's safe to pass `null` for either (or both) of the last two arguments. Th
146146

147147
Right now, the library only supports sending and receiving messages. ACK messages and transactions are not implemented yet.
148148

149-
**Additional Reading**
149+
## Additional Reading
150150

151151
- [Spring + Websockets + STOMP](https://spring.io/guides/gs/messaging-stomp-websocket/)
152152
- [STOMP Protocol](http://stomp.github.io/)

lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public interface ConnectionProvider {
1818
* onError if not connected or error detected will be called, or onCompleted id sending started
1919
* TODO: send messages with ACK
2020
*/
21-
Observable<Void> send(String stompMessage);
21+
Completable send(String stompMessage);
2222

2323
/**
2424
* Subscribe this for receive #LifecycleEvent events

lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java

Lines changed: 4 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22

33
import android.util.Log;
44

5-
import java.util.ArrayList;
65
import java.util.HashMap;
7-
import java.util.Iterator;
8-
import java.util.List;
96
import java.util.Map;
107
import java.util.TreeMap;
118

@@ -18,30 +15,24 @@
1815
import okio.ByteString;
1916
import rx.Completable;
2017
import rx.Observable;
21-
import rx.Subscriber;
2218
import rx.subjects.PublishSubject;
2319

24-
/* package */ class OkHttpConnectionProvider implements ConnectionProvider {
20+
class OkHttpConnectionProvider implements ConnectionProvider {
2521

2622
private static final String TAG = WebSocketsConnectionProvider.class.getSimpleName();
2723

2824
private final String mUri;
2925
private final Map<String, String> mConnectHttpHeaders;
3026
private final OkHttpClient mOkHttpClient;
3127

32-
private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
3328
private final PublishSubject<LifecycleEvent> mLifecycleStream;
34-
private final List<Subscriber<? super String>> mMessagesSubscribers;
3529
private final PublishSubject<String> mMessagesStream;
3630

3731
private WebSocket openedSocked;
3832

39-
40-
/* package */ OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
33+
OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
4134
mUri = uri;
4235
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
43-
mLifecycleSubscribers = new ArrayList<>();
44-
mMessagesSubscribers = new ArrayList<>();
4536
mOkHttpClient = okHttpClient;
4637

4738
mLifecycleStream = PublishSubject.create();
@@ -51,35 +42,9 @@
5142
@Override
5243
public Observable<String> messages() {
5344
createWebSocketConnection();
54-
// By using Subjects, we can leave the tracking of Subscribers to Rx.
55-
// Additionally, server disconnection is now handled manually
56-
// (instead of trying to support disconnecting just by unsubscribing)
5745
return mMessagesStream;
58-
59-
/*
60-
Observable<String> observable = Observable.<String>create(subscriber -> {
61-
mMessagesSubscribers.add(subscriber);
62-
63-
}).doOnUnsubscribe(() -> {
64-
Iterator<Subscriber<? super String>> iterator = mMessagesSubscribers.iterator();
65-
while (iterator.hasNext()) {
66-
if (iterator.next().isUnsubscribed()) iterator.remove();
67-
}
68-
69-
if (mMessagesSubscribers.size() < 1) {
70-
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
71-
openedSocked.close(1000, "");
72-
openedSocked = null;
73-
}
74-
});
75-
76-
createWebSocketConnection();
77-
return observable;
78-
*/
7946
}
8047

81-
// this used to be done automatically whenever the "subscriber list" was empty
82-
// this way is more discrete
8348
@Override
8449
public Completable disconnect() {
8550
return Completable.fromAction(() -> openedSocked.close(1000, ""));
@@ -134,9 +99,8 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
13499
}
135100

136101
@Override
137-
public Observable<Void> send(String stompMessage) {
138-
// .create(onSubscribe) is deprecated because it's unsafe
139-
return Observable.fromCallable(() -> {
102+
public Completable send(String stompMessage) {
103+
return Completable.fromCallable(() -> {
140104
if (openedSocked == null) {
141105
throw new IllegalStateException("Not connected yet");
142106
} else {
@@ -149,20 +113,7 @@ public Observable<Void> send(String stompMessage) {
149113

150114
@Override
151115
public Observable<LifecycleEvent> getLifecycleReceiver() {
152-
// Once again, opting to leave Subscriber tracking to Rx
153116
return mLifecycleStream;
154-
155-
/*
156-
return Observable.<LifecycleEvent>create(subscriber -> {
157-
mLifecycleSubscribers.add(subscriber);
158-
159-
}).doOnUnsubscribe(() -> {
160-
Iterator<Subscriber<? super LifecycleEvent>> iterator = mLifecycleSubscribers.iterator();
161-
while (iterator.hasNext()) {
162-
if (iterator.next().isUnsubscribed()) iterator.remove();
163-
}
164-
});
165-
*/
166117
}
167118

168119
private TreeMap<String, String> headersAsMap(Response response) {
@@ -182,24 +133,11 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<S
182133

183134
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
184135
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
185-
// I know Subjects are discouraged, but I think this is way cleaner than before
186136
mLifecycleStream.onNext(lifecycleEvent);
187-
188-
/*
189-
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {
190-
subscriber.onNext(lifecycleEvent);
191-
}
192-
*/
193137
}
194138

195139
private void emitMessage(String stompMessage) {
196140
Log.d(TAG, "Emit STOMP message: " + stompMessage);
197141
mMessagesStream.onNext(stompMessage);
198-
199-
/*
200-
for (Subscriber<? super String> subscriber : mMessagesSubscribers) {
201-
subscriber.onNext(stompMessage);
202-
}
203-
*/
204142
}
205143
}

lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java

Lines changed: 23 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,8 @@
1010
import org.java_websocket.handshake.ServerHandshake;
1111

1212
import java.net.URI;
13-
import java.util.ArrayList;
1413
import java.util.HashMap;
1514
import java.util.Iterator;
16-
import java.util.List;
1715
import java.util.Map;
1816
import java.util.TreeMap;
1917

@@ -22,55 +20,47 @@
2220

2321
import rx.Completable;
2422
import rx.Observable;
25-
import rx.Subscriber;
23+
import rx.subjects.PublishSubject;
2624

2725
/**
2826
* Created by naik on 05.05.16.
2927
*/
30-
/* package */ class WebSocketsConnectionProvider implements ConnectionProvider {
28+
29+
class WebSocketsConnectionProvider implements ConnectionProvider {
3130

3231
private static final String TAG = WebSocketsConnectionProvider.class.getSimpleName();
3332

3433
private final String mUri;
3534
private final Map<String, String> mConnectHttpHeaders;
3635

37-
private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
38-
private final List<Subscriber<? super String>> mMessagesSubscribers;
39-
4036
private WebSocketClient mWebSocketClient;
4137
private boolean haveConnection;
4238
private TreeMap<String, String> mServerHandshakeHeaders;
4339

40+
private final PublishSubject<LifecycleEvent> mLifecycleStream;
41+
private final PublishSubject<String> mMessagesStream;
42+
4443
/**
4544
* Support UIR scheme ws://host:port/path
4645
* @param connectHttpHeaders may be null
4746
*/
48-
/* package */ WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
47+
WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
4948
mUri = uri;
5049
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
51-
mLifecycleSubscribers = new ArrayList<>();
52-
mMessagesSubscribers = new ArrayList<>();
50+
51+
mLifecycleStream = PublishSubject.create();
52+
mMessagesStream = PublishSubject.create();
5353
}
5454

5555
@Override
5656
public Observable<String> messages() {
57-
Observable<String> observable = Observable.<String>create(subscriber -> {
58-
mMessagesSubscribers.add(subscriber);
59-
60-
}).doOnUnsubscribe(() -> {
61-
Iterator<Subscriber<? super String>> iterator = mMessagesSubscribers.iterator();
62-
while (iterator.hasNext()) {
63-
if (iterator.next().isUnsubscribed()) iterator.remove();
64-
}
65-
66-
if (mMessagesSubscribers.size() < 1) {
67-
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
68-
mWebSocketClient.close();
69-
}
70-
});
71-
7257
createWebSocketConnection();
73-
return observable;
58+
return mMessagesStream;
59+
}
60+
61+
@Override
62+
public Completable disconnect() {
63+
return Completable.fromAction(() -> mWebSocketClient.close());
7464
}
7565

7666
private void createWebSocketConnection() {
@@ -134,50 +124,30 @@ public void onError(Exception ex) {
134124
}
135125

136126
@Override
137-
public Observable<Void> send(String stompMessage) {
138-
return Observable.create(subscriber -> {
127+
public Completable send(String stompMessage) {
128+
return Completable.fromCallable(() -> {
139129
if (mWebSocketClient == null) {
140-
subscriber.onError(new IllegalStateException("Not connected yet"));
130+
throw new IllegalStateException("Not connected yet");
141131
} else {
142132
Log.d(TAG, "Send STOMP message: " + stompMessage);
143133
mWebSocketClient.send(stompMessage);
144-
subscriber.onCompleted();
134+
return null;
145135
}
146136
});
147137
}
148138

149-
// Just to appease javac
150-
@Override
151-
public Completable disconnect() {
152-
return Completable.fromAction(() -> {
153-
throw new UnsupportedOperationException("JAVA WEB SOCKETS ARE NOT YET SUPPORTED IN THIS VERSION");
154-
});
155-
}
156-
157139
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
158140
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
159-
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {
160-
subscriber.onNext(lifecycleEvent);
161-
}
141+
mLifecycleStream.onNext(lifecycleEvent);
162142
}
163143

164144
private void emitMessage(String stompMessage) {
165145
Log.d(TAG, "Emit STOMP message: " + stompMessage);
166-
for (Subscriber<? super String> subscriber : mMessagesSubscribers) {
167-
subscriber.onNext(stompMessage);
168-
}
146+
mMessagesStream.onNext(stompMessage);
169147
}
170148

171149
@Override
172150
public Observable<LifecycleEvent> getLifecycleReceiver() {
173-
return Observable.<LifecycleEvent>create(subscriber -> {
174-
mLifecycleSubscribers.add(subscriber);
175-
176-
}).doOnUnsubscribe(() -> {
177-
Iterator<Subscriber<? super LifecycleEvent>> iterator = mLifecycleSubscribers.iterator();
178-
while (iterator.hasNext()) {
179-
if (iterator.next().isUnsubscribed()) iterator.remove();
180-
}
181-
});
151+
return mLifecycleStream;
182152
}
183153
}

lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ public void connect(List<StompHeader> _headers) {
7272
public void connect(List<StompHeader> _headers, boolean reconnect) {
7373
if (reconnect) disconnect();
7474
if (mConnected) return;
75-
// why wasn't this DRY before?
7675
lifecycle()
7776
.subscribe(lifecycleEvent -> {
7877
switch (lifecycleEvent.getType()) {
@@ -127,9 +126,8 @@ public Observable<Void> send(String destination, String data) {
127126
}
128127

129128
public Observable<Void> send(StompMessage stompMessage) {
130-
Observable<Void> observable = mConnectionProvider.send(stompMessage.compile());
129+
Observable<Void> observable = mConnectionProvider.send(stompMessage.compile()).toObservable();
131130
if (!mConnected) {
132-
// my inner grammar nazi
133131
ConnectableObservable<Void> deferred = observable.publish();
134132
mWaitConnectionObservables.add(deferred);
135133
return deferred;
@@ -155,7 +153,6 @@ public Observable<LifecycleEvent> lifecycle() {
155153
}
156154

157155
public void disconnect() {
158-
// the other things are now taken care of downstream
159156
mConnectionProvider.disconnect().subscribe();
160157
}
161158

0 commit comments

Comments
 (0)