Skip to content

Commit 0e82ce9

Browse files
Did some cleanup, created another ConnectionProvider abstraction
1 parent 8a41770 commit 0e82ce9

File tree

5 files changed

+132
-191
lines changed

5 files changed

+132
-191
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package ua.naiksoftware.stomp;
2+
3+
import android.util.Log;
4+
5+
import rx.Completable;
6+
import rx.Observable;
7+
import rx.subjects.PublishSubject;
8+
9+
/**
10+
* Created by forresthopkinsa on 8/8/2017.
11+
* <p>
12+
* Created because there was a lot of shared code between JWS and OkHttp connection providers.
13+
*/
14+
15+
abstract class AbstractConnectionProvider implements ConnectionProvider {
16+
17+
private static final String TAG = AbstractConnectionProvider.class.getSimpleName();
18+
19+
private final PublishSubject<LifecycleEvent> mLifecycleStream;
20+
private final PublishSubject<String> mMessagesStream;
21+
22+
AbstractConnectionProvider() {
23+
mLifecycleStream = PublishSubject.create();
24+
mMessagesStream = PublishSubject.create();
25+
}
26+
27+
@Override
28+
public Observable<String> messages() {
29+
createWebSocketConnection();
30+
return mMessagesStream;
31+
}
32+
33+
/**
34+
* Completable to close socket.
35+
* <p>
36+
* For example:
37+
* <pre>
38+
* return Completable.fromAction(() -> webSocket.close());
39+
* </pre>
40+
*/
41+
@Override
42+
public abstract Completable disconnect();
43+
44+
/**
45+
* Most important method: connects to websocket and notifies program of messages.
46+
* <p>
47+
* See implementations in OkHttpConnectionProvider and WebSocketsConnectionProvider.
48+
*/
49+
abstract void createWebSocketConnection();
50+
51+
@Override
52+
public Completable send(String stompMessage) {
53+
return Completable.fromCallable(() -> {
54+
if (getSocket() == null) {
55+
throw new IllegalStateException("Not connected yet");
56+
} else {
57+
Log.d(TAG, "Send STOMP message: " + stompMessage);
58+
bareSend(stompMessage);
59+
return null;
60+
}
61+
});
62+
}
63+
64+
/**
65+
* Just a simple message send.
66+
* <p>
67+
* For example:
68+
* <pre>
69+
* webSocket.send(stompMessage);
70+
* </pre>
71+
*
72+
* @param stompMessage message to send
73+
*/
74+
abstract void bareSend(String stompMessage);
75+
76+
/**
77+
* Get socket object.
78+
* Used for null checking; this object is expected to be null when the connection is not yet established.
79+
* <p>
80+
* For example:
81+
* <pre>
82+
* return webSocket;
83+
* </pre>
84+
*/
85+
abstract Object getSocket();
86+
87+
void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
88+
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
89+
mLifecycleStream.onNext(lifecycleEvent);
90+
}
91+
92+
void emitMessage(String stompMessage) {
93+
Log.d(TAG, "Emit STOMP message: " + stompMessage);
94+
mMessagesStream.onNext(stompMessage);
95+
}
96+
97+
@Override
98+
public Observable<LifecycleEvent> getLifecycleReceiver() {
99+
return mLifecycleStream;
100+
}
101+
}

lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package ua.naiksoftware.stomp;
22

3-
import android.util.Log;
4-
53
import java.util.HashMap;
64
import java.util.Map;
75
import java.util.TreeMap;
@@ -14,43 +12,29 @@
1412
import okhttp3.WebSocketListener;
1513
import okio.ByteString;
1614
import rx.Completable;
17-
import rx.Observable;
18-
import rx.subjects.PublishSubject;
19-
20-
class OkHttpConnectionProvider implements ConnectionProvider {
2115

22-
private static final String TAG = OkHttpConnectionProvider.class.getSimpleName();
16+
class OkHttpConnectionProvider extends AbstractConnectionProvider {
2317

2418
private final String mUri;
2519
private final Map<String, String> mConnectHttpHeaders;
2620
private final OkHttpClient mOkHttpClient;
2721

28-
private final PublishSubject<LifecycleEvent> mLifecycleStream;
29-
private final PublishSubject<String> mMessagesStream;
30-
3122
private WebSocket openedSocked;
3223

3324
OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
25+
super();
3426
mUri = uri;
3527
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
3628
mOkHttpClient = okHttpClient;
37-
38-
mLifecycleStream = PublishSubject.create();
39-
mMessagesStream = PublishSubject.create();
40-
}
41-
42-
@Override
43-
public Observable<String> messages() {
44-
createWebSocketConnection();
45-
return mMessagesStream;
4629
}
4730

4831
@Override
4932
public Completable disconnect() {
5033
return Completable.fromAction(() -> openedSocked.close(1000, ""));
5134
}
5235

53-
private void createWebSocketConnection() {
36+
@Override
37+
void createWebSocketConnection() {
5438

5539
if (openedSocked != null) {
5640
throw new IllegalStateException("Already have connection to web socket");
@@ -99,21 +83,13 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
9983
}
10084

10185
@Override
102-
public Completable send(String stompMessage) {
103-
return Completable.fromCallable(() -> {
104-
if (openedSocked == null) {
105-
throw new IllegalStateException("Not connected yet");
106-
} else {
107-
Log.d(TAG, "Send STOMP message: " + stompMessage);
108-
openedSocked.send(stompMessage);
109-
return null;
110-
}
111-
});
86+
void bareSend(String stompMessage) {
87+
openedSocked.send(stompMessage);
11288
}
11389

11490
@Override
115-
public Observable<LifecycleEvent> getLifecycleReceiver() {
116-
return mLifecycleStream;
91+
Object getSocket() {
92+
return openedSocked;
11793
}
11894

11995
private TreeMap<String, String> headersAsMap(Response response) {
@@ -130,14 +106,4 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<S
130106
requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
131107
}
132108
}
133-
134-
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
135-
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
136-
mLifecycleStream.onNext(lifecycleEvent);
137-
}
138-
139-
private void emitMessage(String stompMessage) {
140-
Log.d(TAG, "Emit STOMP message: " + stompMessage);
141-
mMessagesStream.onNext(stompMessage);
142-
}
143109
}

lib/src/main/java/ua/naiksoftware/stomp/Stomp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
/**
1111
* Supported overlays:
12-
* - org.java_websocket.WebSocket ('org.java-websocket:Java-WebSocket:1.3.0')
13-
* - okhttp3.WebSocket ('com.squareup.okhttp3:okhttp:3.8.0')
12+
* - org.java_websocket.WebSocket ('org.java-websocket:Java-WebSocket:1.3.2')
13+
* - okhttp3.WebSocket ('com.squareup.okhttp3:okhttp:3.8.1')
1414
*
1515
* You can add own relay, just implement ConnectionProvider for you stomp transport,
1616
* such as web socket.

lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java

Lines changed: 7 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@
1919
import javax.net.ssl.SSLSocketFactory;
2020

2121
import rx.Completable;
22-
import rx.Observable;
23-
import rx.subjects.PublishSubject;
2422

2523
/**
2624
* Created by naik on 05.05.16.
2725
*/
2826

29-
class WebSocketsConnectionProvider implements ConnectionProvider {
27+
class WebSocketsConnectionProvider extends AbstractConnectionProvider {
3028

3129
private static final String TAG = WebSocketsConnectionProvider.class.getSimpleName();
3230

@@ -37,33 +35,22 @@ class WebSocketsConnectionProvider implements ConnectionProvider {
3735
private boolean haveConnection;
3836
private TreeMap<String, String> mServerHandshakeHeaders;
3937

40-
private final PublishSubject<LifecycleEvent> mLifecycleStream;
41-
private final PublishSubject<String> mMessagesStream;
42-
4338
/**
4439
* Support UIR scheme ws://host:port/path
4540
* @param connectHttpHeaders may be null
4641
*/
4742
WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
4843
mUri = uri;
4944
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
50-
51-
mLifecycleStream = PublishSubject.create();
52-
mMessagesStream = PublishSubject.create();
53-
}
54-
55-
@Override
56-
public Observable<String> messages() {
57-
createWebSocketConnection();
58-
return mMessagesStream;
5945
}
6046

6147
@Override
6248
public Completable disconnect() {
6349
return Completable.fromAction(() -> mWebSocketClient.close());
6450
}
6551

66-
private void createWebSocketConnection() {
52+
@Override
53+
void createWebSocketConnection() {
6754
if (haveConnection)
6855
throw new IllegalStateException("Already have connection to web socket");
6956

@@ -124,30 +111,12 @@ public void onError(Exception ex) {
124111
}
125112

126113
@Override
127-
public Completable send(String stompMessage) {
128-
return Completable.fromCallable(() -> {
129-
if (mWebSocketClient == null) {
130-
throw new IllegalStateException("Not connected yet");
131-
} else {
132-
Log.d(TAG, "Send STOMP message: " + stompMessage);
133-
mWebSocketClient.send(stompMessage);
134-
return null;
135-
}
136-
});
137-
}
138-
139-
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
140-
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
141-
mLifecycleStream.onNext(lifecycleEvent);
142-
}
143-
144-
private void emitMessage(String stompMessage) {
145-
Log.d(TAG, "Emit STOMP message: " + stompMessage);
146-
mMessagesStream.onNext(stompMessage);
114+
void bareSend(String stompMessage) {
115+
mWebSocketClient.send(stompMessage);
147116
}
148117

149118
@Override
150-
public Observable<LifecycleEvent> getLifecycleReceiver() {
151-
return mLifecycleStream;
119+
Object getSocket() {
120+
return mWebSocketClient;
152121
}
153122
}

0 commit comments

Comments
 (0)