Skip to content

Commit 9c7a46b

Browse files
Finished fixing reconnect and its race conditions
Managed to do it without any `synchronized` blocking
1 parent d719ba6 commit 9c7a46b

File tree

4 files changed

+68
-19
lines changed

4 files changed

+68
-19
lines changed

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ You can use this library two ways:
3030
- Using the new Native Java 8 support
3131
- As of this writing, you must be using Android Studio Canary to use this feature.
3232
- Has been tested in the following environments:
33+
- Beta 2, Gradle plugin v3.0.0-beta2
34+
- Beta 1, Gradle plugin v3.0.0-beta1
3335
- Canary 9, Gradle plugin v3.0.0-alpha9
3436
- Canary 8, Gradle plugin v3.0.0-alpha8
3537
- It *should* work in all 3.0.0+ versions
@@ -219,6 +221,21 @@ These are the possible changes you need to make to your code for this branch, if
219221
});
220222
```
221223
- Be sure to implement this change, because the IDE might not catch the error.
224+
- Removed `reconnect` parameter from `connect(...)` methods
225+
- Old, deprecated overloads of the method:
226+
``` java
227+
void connect(boolean reconnect) {...}
228+
void connect(List<StompHeader> _headers, boolean reconnect) {...}
229+
```
230+
- Now, these are the *only* two ways to call `connect` (these existed before, too):
231+
``` java
232+
void connect() {...}
233+
void connect(List<StompHeader> _headers) {...}
234+
```
235+
- Additionally, reconnection is now handled by just calling `reconnect()`
236+
- It automatically attaches the last-used connect headers
237+
- It is meant to be used when already connected; it executes `disconnect()`
238+
- Note that if you're already disconnected, this will throw a `TimeoutException`
222239
- Passing null as the topic path now throws an exception
223240
- Previously, it was supposed to silently fail, although it would probably hit a NPE first (untested)
224241
- Now it throws an IllegalArgumentException

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ buildscript {
88
}
99
}
1010
dependencies {
11-
classpath 'com.android.tools.build:gradle:3.0.0-beta1'
11+
classpath 'com.android.tools.build:gradle:3.0.0-beta2'
1212
classpath 'com.github.dcendents:android-maven-gradle-plugin:1.5'
1313

1414
// NOTE: Do not place your application dependencies here; they belong

lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import java.util.HashMap;
88
import java.util.Map;
99
import java.util.TreeMap;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.concurrent.TimeoutException;
1012

1113
import okhttp3.Headers;
1214
import okhttp3.OkHttpClient;
@@ -16,6 +18,7 @@
1618
import okhttp3.WebSocketListener;
1719
import okio.ByteString;
1820
import rx.Completable;
21+
import rx.subjects.BehaviorSubject;
1922

2023
class OkHttpConnectionProvider extends AbstractConnectionProvider {
2124

@@ -24,6 +27,7 @@ class OkHttpConnectionProvider extends AbstractConnectionProvider {
2427
private final Map<String, String> mConnectHttpHeaders;
2528
private final OkHttpClient mOkHttpClient;
2629
private final String tag = OkHttpConnectionProvider.class.getSimpleName();
30+
private final BehaviorSubject<Boolean> mConnectionStream;
2731

2832
@Nullable
2933
private WebSocket openedSocked;
@@ -33,24 +37,50 @@ class OkHttpConnectionProvider extends AbstractConnectionProvider {
3337
mUri = uri;
3438
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
3539
mOkHttpClient = okHttpClient;
40+
mConnectionStream = BehaviorSubject.create(false);
41+
mConnectionStream.subscribe(value -> Log.d(tag, "Connection stream emitted: " + value));
3642
}
3743

3844
@NonNull
3945
@Override
4046
public Completable disconnect() {
47+
Completable block = mConnectionStream
48+
.first(isConnected -> isConnected)
49+
.timeout(1, TimeUnit.SECONDS)
50+
.doOnError(error -> {
51+
if (error.getClass().equals(TimeoutException.class))
52+
Log.e(tag, "Attempted to disconnect when already disconnected");
53+
})
54+
.toCompletable();
55+
56+
/*
4157
if (openedSocked == null) {
4258
return Completable.error(new IllegalStateException("Attempted to disconnect when already disconnected"));
4359
}
60+
*/
61+
4462
return Completable
45-
.fromAction(() -> openedSocked.close(1000, ""));
63+
.fromAction(() -> openedSocked.close(1000, ""))
64+
.startWith(block);
4665
}
4766

4867
@Override
4968
void createWebSocketConnection() {
50-
69+
Completable block = mConnectionStream
70+
.first(isConnected -> !isConnected)
71+
.timeout(1, TimeUnit.SECONDS)
72+
.doOnError(error -> {
73+
if (error.getClass().equals(TimeoutException.class))
74+
Log.e(tag, "Attempted to connect when already connected");
75+
})
76+
.toCompletable();
77+
block.get(); // todo: do this the right way
78+
79+
/*
5180
if (openedSocked != null) {
5281
throw new IllegalStateException("Already have connection to web socket");
5382
}
83+
*/
5484

5585
Request.Builder requestBuilder = new Request.Builder()
5686
.url(mUri);
@@ -86,6 +116,7 @@ public void onMessage(WebSocket webSocket, @NonNull ByteString bytes) {
86116
public void onClosed(WebSocket webSocket, int code, String reason) {
87117
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
88118
openedSocked = null;
119+
mConnectionStream.onNext(false);
89120
}
90121

91122
@Override
@@ -95,6 +126,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
95126
}
96127

97128
);
129+
mConnectionStream.onNext(true);
98130
}
99131

100132
@Override

lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java8.util.concurrent.CompletableFuture;
1515
import rx.Completable;
1616
import rx.Observable;
17+
import rx.Subscription;
1718
import rx.schedulers.Schedulers;
1819
import rx.subjects.PublishSubject;
1920
import ua.naiksoftware.stomp.ConnectionProvider;
@@ -41,6 +42,8 @@ public class StompClient {
4142
private Completable mConnectionComplete;
4243
private HashMap<String, Observable<StompMessage>> mStreamMap;
4344
private Parser parser;
45+
private Subscription lifecycleSub;
46+
private List<StompHeader> mHeaders;
4447

4548
public StompClient(ConnectionProvider connectionProvider) {
4649
mConnectionProvider = connectionProvider;
@@ -82,28 +85,16 @@ public void connect() {
8285
connect(null);
8386
}
8487

85-
public void connect(boolean reconnect) {
86-
connect(null, reconnect);
87-
}
88-
89-
/**
90-
* Connect without reconnect if connected
91-
*
92-
* @param _headers HTTP headers to send in the INITIAL REQUEST, i.e. during the protocol upgrade
93-
*/
94-
public void connect(List<StompHeader> _headers) {
95-
connect(_headers, false);
96-
}
97-
9888
/**
9989
* If already connected and reconnect=false - nope
10090
*
10191
* @param _headers HTTP headers to send in the INITIAL REQUEST, i.e. during the protocol upgrade
10292
*/
103-
public void connect(@Nullable List<StompHeader> _headers, boolean reconnect) {
104-
if (reconnect) disconnect();
93+
public void connect(@Nullable List<StompHeader> _headers) {
94+
mHeaders = _headers;
95+
10596
if (mConnected) return;
106-
lifecycle()
97+
lifecycleSub = lifecycle()
10798
.subscribe(lifecycleEvent -> {
10899
switch (lifecycleEvent.getType()) {
109100
case OPENED:
@@ -138,6 +129,14 @@ public void connect(@Nullable List<StompHeader> _headers, boolean reconnect) {
138129
});
139130
}
140131

132+
/**
133+
* Disconnect from server, and then reconnect with the last-used headers
134+
*/
135+
public void reconnect() {
136+
disconnect();
137+
connect(mHeaders);
138+
}
139+
141140
public Completable send(String destination) {
142141
return send(destination, null);
143142
}
@@ -164,6 +163,7 @@ public Observable<LifecycleEvent> lifecycle() {
164163

165164
public void disconnect() {
166165
resetStatus();
166+
lifecycleSub.unsubscribe();
167167
mConnectionProvider.disconnect().subscribe(() -> mConnected = false);
168168
}
169169

0 commit comments

Comments
 (0)