Skip to content

Commit 46170ca

Browse files
flolomFrancois Lolom a558367
authored andcommitted
Add a new connection provider: OkHttp
1 parent 2045f63 commit 46170ca

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed

lib/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ dependencies {
3535
compile 'io.reactivex:rxjava:1.2.0'
3636
// Supported transports
3737
provided "org.java-websocket:java-websocket:1.3.2"
38+
provided 'com.squareup.okhttp3:okhttp:3.8.0'
3839
}
3940

4041
task sourcesJar(type: Jar) {
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package ua.naiksoftware.stomp;
2+
3+
import android.util.Log;
4+
5+
import java.util.ArrayList;
6+
import java.util.HashMap;
7+
import java.util.Iterator;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.TreeMap;
11+
12+
import okhttp3.Headers;
13+
import okhttp3.OkHttpClient;
14+
import okhttp3.Request;
15+
import okhttp3.Response;
16+
import okhttp3.WebSocket;
17+
import okhttp3.WebSocketListener;
18+
import okio.ByteString;
19+
import rx.Observable;
20+
import rx.Subscriber;
21+
22+
/* package */ class OkHttpConnectionProvider implements ConnectionProvider {
23+
24+
private static final String TAG = WebSocketsConnectionProvider.class.getSimpleName();
25+
26+
private final String mUri;
27+
private final Map<String, String> mConnectHttpHeaders;
28+
29+
private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
30+
private final List<Subscriber<? super String>> mMessagesSubscribers;
31+
32+
private WebSocket openedSocked;
33+
34+
35+
/* package */ OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
36+
mUri = uri;
37+
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
38+
mLifecycleSubscribers = new ArrayList<>();
39+
mMessagesSubscribers = new ArrayList<>();
40+
}
41+
42+
@Override
43+
public Observable<String> messages() {
44+
Observable<String> observable = Observable.<String>create(subscriber -> {
45+
mMessagesSubscribers.add(subscriber);
46+
47+
}).doOnUnsubscribe(() -> {
48+
Iterator<Subscriber<? super String>> iterator = mMessagesSubscribers.iterator();
49+
while (iterator.hasNext()) {
50+
if (iterator.next().isUnsubscribed()) iterator.remove();
51+
}
52+
53+
if (mMessagesSubscribers.size() < 1) {
54+
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
55+
openedSocked.close(1000, "");
56+
openedSocked = null;
57+
}
58+
});
59+
60+
createWebSocketConnection();
61+
return observable;
62+
}
63+
64+
private void createWebSocketConnection() {
65+
66+
if (openedSocked != null) {
67+
throw new IllegalStateException("Already have connection to web socket");
68+
}
69+
70+
OkHttpClient okHttpClient = new OkHttpClient.Builder()
71+
.build();
72+
73+
Request.Builder requestBuilder = new Request.Builder()
74+
.url(mUri);
75+
76+
addConnectionHeadersToBuilder(requestBuilder, mConnectHttpHeaders);
77+
78+
openedSocked = okHttpClient.newWebSocket(requestBuilder.build(),
79+
new WebSocketListener() {
80+
@Override
81+
public void onOpen(WebSocket webSocket, Response response) {
82+
LifecycleEvent openEvent = new LifecycleEvent(LifecycleEvent.Type.OPENED);
83+
84+
TreeMap<String, String> headersAsMap = headersAsMap(response);
85+
86+
openEvent.setHandshakeResponseHeaders(headersAsMap);
87+
emitLifecycleEvent(openEvent);
88+
}
89+
90+
@Override
91+
public void onMessage(WebSocket webSocket, String text) {
92+
emitMessage(text);
93+
}
94+
95+
@Override
96+
public void onMessage(WebSocket webSocket, ByteString bytes) {
97+
emitMessage(bytes.utf8());
98+
}
99+
100+
@Override
101+
public void onClosed(WebSocket webSocket, int code, String reason) {
102+
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
103+
openedSocked = null;
104+
}
105+
106+
@Override
107+
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
108+
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.ERROR, new Exception(t)));
109+
}
110+
}
111+
112+
);
113+
}
114+
115+
@Override
116+
public Observable<Void> send(String stompMessage) {
117+
return Observable.create(subscriber -> {
118+
if (openedSocked == null) {
119+
subscriber.onError(new IllegalStateException("Not connected yet"));
120+
} else {
121+
Log.d(TAG, "Send STOMP message: " + stompMessage);
122+
openedSocked.send(stompMessage);
123+
subscriber.onCompleted();
124+
}
125+
});
126+
}
127+
128+
@Override
129+
public Observable<LifecycleEvent> getLifecycleReceiver() {
130+
return Observable.<LifecycleEvent>create(subscriber -> {
131+
mLifecycleSubscribers.add(subscriber);
132+
133+
}).doOnUnsubscribe(() -> {
134+
Iterator<Subscriber<? super LifecycleEvent>> iterator = mLifecycleSubscribers.iterator();
135+
while (iterator.hasNext()) {
136+
if (iterator.next().isUnsubscribed()) iterator.remove();
137+
}
138+
});
139+
}
140+
141+
private TreeMap<String, String> headersAsMap(Response response) {
142+
TreeMap<String, String> headersAsMap = new TreeMap<>();
143+
Headers headers = response.headers();
144+
for (String key : headers.names()) {
145+
headersAsMap.put(key, headers.get(key));
146+
}
147+
return headersAsMap;
148+
}
149+
150+
private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<String, String> mConnectHttpHeaders) {
151+
for (Map.Entry<String, String> headerEntry : mConnectHttpHeaders.entrySet()) {
152+
requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
153+
}
154+
}
155+
156+
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
157+
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
158+
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {
159+
subscriber.onNext(lifecycleEvent);
160+
}
161+
}
162+
163+
private void emitMessage(String stompMessage) {
164+
Log.d(TAG, "Emit STOMP message: " + stompMessage);
165+
for (Subscriber<? super String> subscriber : mMessagesSubscribers) {
166+
subscriber.onNext(stompMessage);
167+
}
168+
}
169+
}

lib/src/main/java/ua/naiksoftware/stomp/Stomp.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
/**
1010
* Supported overlays:
1111
* - org.java_websocket.WebSocket ('org.java-websocket:Java-WebSocket:1.3.0')
12+
* - okhttp3.WebSocket ('com.squareup.okhttp3:okhttp:3.8.0')
1213
*
1314
* You can add own relay, just implement ConnectionProvider for you stomp transport,
1415
* such as web socket.
@@ -31,6 +32,8 @@ public static StompClient over(Class clazz, String uri) {
3132
public static StompClient over(Class clazz, String uri, Map<String, String> connectHttpHeaders) {
3233
if (clazz == WebSocket.class) {
3334
return createStompClient(new WebSocketsConnectionProvider(uri, connectHttpHeaders));
35+
} else if (clazz == okhttp3.WebSocket.class) {
36+
return createStompClient(new OkHttpConnectionProvider(uri, connectHttpHeaders));
3437
}
3538

3639
throw new RuntimeException("Not supported overlay transport: " + clazz.getName());

0 commit comments

Comments
 (0)