Skip to content

Commit 2067dfc

Browse files
Abstracted reconnect logic and implemented it for JWS
(JWS is still totally untested)
1 parent 9c7a46b commit 2067dfc

File tree

3 files changed

+47
-57
lines changed

3 files changed

+47
-57
lines changed

lib/src/main/java/ua/naiksoftware/stomp/AbstractConnectionProvider.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
import android.support.annotation.Nullable;
55
import android.util.Log;
66

7+
import java.util.concurrent.TimeUnit;
8+
79
import rx.Completable;
810
import rx.Observable;
11+
import rx.subjects.BehaviorSubject;
912
import rx.subjects.PublishSubject;
1013

1114
/**
@@ -22,29 +25,56 @@ abstract class AbstractConnectionProvider implements ConnectionProvider {
2225
private final PublishSubject<LifecycleEvent> mLifecycleStream;
2326
@NonNull
2427
private final PublishSubject<String> mMessagesStream;
28+
final BehaviorSubject<Boolean> mConnectionStream;
2529

2630
AbstractConnectionProvider() {
2731
mLifecycleStream = PublishSubject.create();
2832
mMessagesStream = PublishSubject.create();
33+
mConnectionStream = BehaviorSubject.create(false);
2934
}
3035

3136
@NonNull
3237
@Override
3338
public Observable<String> messages() {
34-
createWebSocketConnection();
35-
return mMessagesStream;
39+
return mMessagesStream.startWith(initSocket().toObservable());
3640
}
3741

3842
/**
39-
* Completable to close socket.
43+
* Simply close socket.
4044
* <p>
4145
* For example:
4246
* <pre>
43-
* return Completable.fromAction(() -> webSocket.close());
47+
* webSocket.close();
4448
* </pre>
4549
*/
50+
abstract void rawDisconnect();
51+
4652
@Override
47-
public abstract Completable disconnect();
53+
public Completable disconnect() {
54+
Observable<Boolean> ex = Observable.error(new IllegalStateException("Attempted to disconnect when already disconnected"));
55+
56+
Completable block = mConnectionStream
57+
.first(isConnected -> isConnected)
58+
.timeout(1, TimeUnit.SECONDS, ex)
59+
.toCompletable();
60+
61+
return Completable
62+
.fromAction(this::rawDisconnect)
63+
.startWith(block);
64+
}
65+
66+
private Completable initSocket() {
67+
Observable<Boolean> ex = Observable.error(new IllegalStateException("Attempted to connect when already connected"));
68+
69+
Completable block = mConnectionStream
70+
.first(isConnected -> !isConnected)
71+
.timeout(1, TimeUnit.SECONDS, ex)
72+
.toCompletable();
73+
74+
return Completable
75+
.fromAction(this::createWebSocketConnection)
76+
.startWith(block);
77+
}
4878

4979
/**
5080
* Most important method: connects to websocket and notifies program of messages.
@@ -61,7 +91,7 @@ public Completable send(String stompMessage) {
6191
throw new IllegalStateException("Not connected yet");
6292
} else {
6393
Log.d(TAG, "Send STOMP message: " + stompMessage);
64-
bareSend(stompMessage);
94+
rawSend(stompMessage);
6595
return null;
6696
}
6797
});
@@ -77,7 +107,7 @@ public Completable send(String stompMessage) {
77107
*
78108
* @param stompMessage message to send
79109
*/
80-
abstract void bareSend(String stompMessage);
110+
abstract void rawSend(String stompMessage);
81111

82112
/**
83113
* Get socket object.
@@ -94,6 +124,8 @@ public Completable send(String stompMessage) {
94124
void emitLifecycleEvent(@NonNull LifecycleEvent lifecycleEvent) {
95125
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
96126
mLifecycleStream.onNext(lifecycleEvent);
127+
if (lifecycleEvent.getType().equals(LifecycleEvent.Type.CLOSED))
128+
mConnectionStream.onNext(false);
97129
}
98130

99131
void emitMessage(String stompMessage) {

lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import java.util.HashMap;
88
import java.util.Map;
99
import java.util.TreeMap;
10-
import java.util.concurrent.TimeUnit;
11-
import java.util.concurrent.TimeoutException;
1210

1311
import okhttp3.Headers;
1412
import okhttp3.OkHttpClient;
@@ -17,8 +15,6 @@
1715
import okhttp3.WebSocket;
1816
import okhttp3.WebSocketListener;
1917
import okio.ByteString;
20-
import rx.Completable;
21-
import rx.subjects.BehaviorSubject;
2218

2319
class OkHttpConnectionProvider extends AbstractConnectionProvider {
2420

@@ -27,7 +23,6 @@ class OkHttpConnectionProvider extends AbstractConnectionProvider {
2723
private final Map<String, String> mConnectHttpHeaders;
2824
private final OkHttpClient mOkHttpClient;
2925
private final String tag = OkHttpConnectionProvider.class.getSimpleName();
30-
private final BehaviorSubject<Boolean> mConnectionStream;
3126

3227
@Nullable
3328
private WebSocket openedSocked;
@@ -37,51 +32,16 @@ class OkHttpConnectionProvider extends AbstractConnectionProvider {
3732
mUri = uri;
3833
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
3934
mOkHttpClient = okHttpClient;
40-
mConnectionStream = BehaviorSubject.create(false);
41-
mConnectionStream.subscribe(value -> Log.d(tag, "Connection stream emitted: " + value));
4235
}
4336

4437
@NonNull
4538
@Override
46-
public Completable disconnect() {
47-
Completable block = mConnectionStream
48-
.first(isConnected -> isConnected)
49-
.timeout(1, TimeUnit.SECONDS)
50-
.doOnError(error -> {
51-
if (error.getClass().equals(TimeoutException.class))
52-
Log.e(tag, "Attempted to disconnect when already disconnected");
53-
})
54-
.toCompletable();
55-
56-
/*
57-
if (openedSocked == null) {
58-
return Completable.error(new IllegalStateException("Attempted to disconnect when already disconnected"));
59-
}
60-
*/
61-
62-
return Completable
63-
.fromAction(() -> openedSocked.close(1000, ""))
64-
.startWith(block);
39+
public void rawDisconnect() {
40+
openedSocked.close(1000, "");
6541
}
6642

6743
@Override
6844
void createWebSocketConnection() {
69-
Completable block = mConnectionStream
70-
.first(isConnected -> !isConnected)
71-
.timeout(1, TimeUnit.SECONDS)
72-
.doOnError(error -> {
73-
if (error.getClass().equals(TimeoutException.class))
74-
Log.e(tag, "Attempted to connect when already connected");
75-
})
76-
.toCompletable();
77-
block.get(); // todo: do this the right way
78-
79-
/*
80-
if (openedSocked != null) {
81-
throw new IllegalStateException("Already have connection to web socket");
82-
}
83-
*/
84-
8545
Request.Builder requestBuilder = new Request.Builder()
8646
.url(mUri);
8747

@@ -114,9 +74,8 @@ public void onMessage(WebSocket webSocket, @NonNull ByteString bytes) {
11474

11575
@Override
11676
public void onClosed(WebSocket webSocket, int code, String reason) {
117-
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
11877
openedSocked = null;
119-
mConnectionStream.onNext(false);
78+
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
12079
}
12180

12281
@Override
@@ -130,7 +89,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
13089
}
13190

13291
@Override
133-
void bareSend(String stompMessage) {
92+
void rawSend(String stompMessage) {
13493
openedSocked.send(stompMessage);
13594
}
13695

lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import javax.net.ssl.SSLContext;
2121
import javax.net.ssl.SSLSocketFactory;
2222

23-
import rx.Completable;
24-
2523
/**
2624
* Created by naik on 05.05.16.
2725
*/
@@ -49,8 +47,8 @@ class WebSocketsConnectionProvider extends AbstractConnectionProvider {
4947

5048
@NonNull
5149
@Override
52-
public Completable disconnect() {
53-
return Completable.fromAction(() -> mWebSocketClient.close());
50+
public void rawDisconnect() {
51+
mWebSocketClient.close();
5452
}
5553

5654
@Override
@@ -112,10 +110,11 @@ public void onError(Exception ex) {
112110

113111
mWebSocketClient.connect();
114112
haveConnection = true;
113+
mConnectionStream.onNext(true);
115114
}
116115

117116
@Override
118-
void bareSend(String stompMessage) {
117+
void rawSend(String stompMessage) {
119118
mWebSocketClient.send(stompMessage);
120119
}
121120

0 commit comments

Comments
 (0)