Skip to content

Commit 0a179d9

Browse files
committed
Fix NPE when unsubscribe from lifecycle and emit lifecycle event in one time
1 parent 8780db3 commit 0a179d9

File tree

4 files changed

+59
-46
lines changed

4 files changed

+59
-46
lines changed

.idea/misc.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

example-client/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ android {
2828
dependencies {
2929
compile fileTree(dir: 'libs', include: ['*.jar'])
3030
testCompile 'junit:junit:4.12'
31-
compile 'com.android.support:appcompat-v7:25.2.0'
31+
compile 'com.android.support:appcompat-v7:25.3.1'
3232
compile 'org.java-websocket:java-websocket:1.3.2'
33-
compile 'com.android.support:recyclerview-v7:25.2.0'
33+
compile 'com.android.support:recyclerview-v7:25.3.1'
3434
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
3535
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
3636
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'

lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
private WebSocket openedSocked;
3535

36+
private final Object mLifecycleLock = new Object();
37+
3638

3739
/* package */ OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
3840
mUri = uri;
@@ -45,18 +47,18 @@
4547
@Override
4648
public Flowable<String> messages() {
4749
Flowable<String> flowable = Flowable.<String>create(mMessagesEmitters::add, BackpressureStrategy.BUFFER)
48-
.doOnCancel(() -> {
49-
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
50-
while (iterator.hasNext()) {
51-
if (iterator.next().isCancelled()) iterator.remove();
52-
}
50+
.doOnCancel(() -> {
51+
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
52+
while (iterator.hasNext()) {
53+
if (iterator.next().isCancelled()) iterator.remove();
54+
}
5355

54-
if (mMessagesEmitters.size() < 1) {
55-
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
56-
openedSocked.close(1000, "");
57-
openedSocked = null;
58-
}
59-
});
56+
if (mMessagesEmitters.size() < 1) {
57+
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
58+
openedSocked.close(1000, "");
59+
openedSocked = null;
60+
}
61+
});
6062
createWebSocketConnection();
6163
return flowable;
6264
}
@@ -69,9 +71,9 @@ private void createWebSocketConnection() {
6971

7072
Request.Builder requestBuilder = new Request.Builder()
7173
.url(mUri);
72-
74+
7375
addConnectionHeadersToBuilder(requestBuilder, mConnectHttpHeaders);
74-
76+
7577
openedSocked = mOkHttpClient.newWebSocket(requestBuilder.build(),
7678
new WebSocketListener() {
7779
@Override
@@ -125,12 +127,14 @@ public Flowable<Void> send(String stompMessage) {
125127
@Override
126128
public Flowable<LifecycleEvent> getLifecycleReceiver() {
127129
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
128-
.doOnCancel(() -> {
129-
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
130-
while (iterator.hasNext()) {
131-
if (iterator.next().isCancelled()) iterator.remove();
132-
}
133-
});
130+
.doOnCancel(() -> {
131+
synchronized (mLifecycleLock) {
132+
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
133+
while (iterator.hasNext()) {
134+
if (iterator.next().isCancelled()) iterator.remove();
135+
}
136+
}
137+
});
134138
}
135139

136140
private TreeMap<String, String> headersAsMap(Response response) {
@@ -149,9 +153,11 @@ private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<S
149153
}
150154

151155
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
152-
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
153-
for (FlowableEmitter<? super LifecycleEvent> subscriber : mLifecycleEmitters) {
154-
subscriber.onNext(lifecycleEvent);
156+
synchronized (mLifecycleLock) {
157+
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
158+
for (FlowableEmitter<? super LifecycleEvent> subscriber : mLifecycleEmitters) {
159+
subscriber.onNext(lifecycleEvent);
160+
}
155161
}
156162
}
157163

lib/src/main/java/ua/naiksoftware/stomp/WebSocketsConnectionProvider.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@
4141
private boolean haveConnection;
4242
private TreeMap<String, String> mServerHandshakeHeaders;
4343

44+
private final Object mLifecycleLock = new Object();
45+
4446
/**
4547
* Support UIR scheme ws://host:port/path
48+
*
4649
* @param connectHttpHeaders may be null
4750
*/
4851
/* package */ WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
@@ -55,17 +58,17 @@
5558
@Override
5659
public Flowable<String> messages() {
5760
Flowable<String> flowable = Flowable.<String>create(mMessagesEmitters::add, BackpressureStrategy.BUFFER)
58-
.doOnCancel(() -> {
59-
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
60-
while (iterator.hasNext()) {
61-
if (iterator.next().isCancelled()) iterator.remove();
62-
}
63-
64-
if (mMessagesEmitters.size() < 1) {
65-
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
66-
mWebSocketClient.close();
67-
}
68-
});
61+
.doOnCancel(() -> {
62+
Iterator<FlowableEmitter<? super String>> iterator = mMessagesEmitters.iterator();
63+
while (iterator.hasNext()) {
64+
if (iterator.next().isCancelled()) iterator.remove();
65+
}
66+
67+
if (mMessagesEmitters.size() < 1) {
68+
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
69+
mWebSocketClient.close();
70+
}
71+
});
6972
createWebSocketConnection();
7073
return flowable;
7174
}
@@ -115,7 +118,7 @@ public void onError(Exception ex) {
115118
}
116119
};
117120

118-
if(mUri.startsWith("wss")) {
121+
if (mUri.startsWith("wss")) {
119122
try {
120123
SSLContext sc = SSLContext.getInstance("TLS");
121124
sc.init(null, null, null);
@@ -144,9 +147,11 @@ public Flowable<Void> send(String stompMessage) {
144147
}
145148

146149
private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
147-
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
148-
for (FlowableEmitter<? super LifecycleEvent> emitter : mLifecycleEmitters) {
149-
emitter.onNext(lifecycleEvent);
150+
synchronized (mLifecycleLock) {
151+
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
152+
for (FlowableEmitter<? super LifecycleEvent> emitter : mLifecycleEmitters) {
153+
emitter.onNext(lifecycleEvent);
154+
}
150155
}
151156
}
152157

@@ -160,11 +165,13 @@ private void emitMessage(String stompMessage) {
160165
@Override
161166
public Flowable<LifecycleEvent> getLifecycleReceiver() {
162167
return Flowable.<LifecycleEvent>create(mLifecycleEmitters::add, BackpressureStrategy.BUFFER)
163-
.doOnCancel(() -> {
164-
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
165-
while (iterator.hasNext()) {
166-
if (iterator.next().isCancelled()) iterator.remove();
167-
}
168-
});
168+
.doOnCancel(() -> {
169+
synchronized (mLifecycleLock) {
170+
Iterator<FlowableEmitter<? super LifecycleEvent>> iterator = mLifecycleEmitters.iterator();
171+
while (iterator.hasNext()) {
172+
if (iterator.next().isCancelled()) iterator.remove();
173+
}
174+
}
175+
});
169176
}
170177
}

0 commit comments

Comments
 (0)