Skip to content

Commit cff4f89

Browse files
author
Nickolay Savchenko
committed
Merge luisalves00 heartbeat support
1 parent c889db4 commit cff4f89

File tree

7 files changed

+309
-22
lines changed

7 files changed

+309
-22
lines changed

lib/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ dependencies {
5656
implementation "io.reactivex.rxjava2:rxjava:2.1.15"
5757
// Supported transports
5858
api 'org.java-websocket:Java-WebSocket:1.3.6'
59-
api 'com.squareup.okhttp3:okhttp:3.10.0'
59+
api 'com.squareup.okhttp3:okhttp:3.11.0'
6060

6161
implementation 'com.android.support:support-annotations:28.0.0'
6262

lib/src/main/java/ua/naiksoftware/stomp/AbstractConnectionProvider.java

Lines changed: 163 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,16 @@
44
import android.support.annotation.Nullable;
55
import android.util.Log;
66

7+
import java.util.concurrent.TimeUnit;
8+
79
import io.reactivex.Completable;
810
import io.reactivex.Observable;
11+
import io.reactivex.Scheduler;
12+
import io.reactivex.disposables.Disposable;
13+
import io.reactivex.schedulers.Schedulers;
914
import io.reactivex.subjects.PublishSubject;
15+
import ua.naiksoftware.stomp.client.StompCommand;
16+
import ua.naiksoftware.stomp.client.StompMessage;
1017

1118
/**
1219
* Created by forresthopkinsa on 8/8/2017.
@@ -18,6 +25,16 @@ abstract class AbstractConnectionProvider implements ConnectionProvider {
1825

1926
private static final String TAG = AbstractConnectionProvider.class.getSimpleName();
2027

28+
private transient Disposable clientSendHeartBeatTask;
29+
private transient Disposable serverCheckHeartBeatTask;
30+
private Scheduler scheduler;
31+
32+
private int serverHeartbeat = 0;
33+
private int clientHeartbeat = 0;
34+
35+
private transient long lastServerHeartBeat = 0;
36+
37+
2138
@NonNull
2239
private final PublishSubject<LifecycleEvent> mLifecycleStream;
2340
@NonNull
@@ -46,6 +63,11 @@ public Observable<String> messages() {
4663

4764
@Override
4865
public Completable disconnect() {
66+
if (clientSendHeartBeatTask != null) {
67+
clientSendHeartBeatTask.dispose();
68+
}
69+
scheduler.shutdown();
70+
Log.d(TAG, "Shutting down heart-beat scheduler...");
4971
return Completable
5072
.fromAction(this::rawDisconnect);
5173
}
@@ -55,11 +77,6 @@ private Completable initSocket() {
5577
.fromAction(this::createWebSocketConnection);
5678
}
5779

58-
// Doesn't do anything at all, only here as a stub
59-
public Completable setHeartbeat(int ms) {
60-
return Completable.complete();
61-
}
62-
6380
/**
6481
* Most important method: connects to websocket and notifies program of messages.
6582
* <p>
@@ -111,13 +128,152 @@ void emitLifecycleEvent(@NonNull LifecycleEvent lifecycleEvent) {
111128
}
112129

113130
void emitMessage(String stompMessage) {
114-
Log.d(TAG, "Emit STOMP message: " + stompMessage);
115-
mMessagesStream.onNext(stompMessage);
131+
//TODO: Why we don't publish a StompMessage, instead of String? will this connection provider work with other protocol?
132+
final StompMessage sm = StompMessage.from(stompMessage);
133+
if (StompCommand.CONNECTED.equals(sm.getStompCommand())) {
134+
heartBeatHandshake(sm.findHeader(StompHeader.HEART_BEAT));
135+
} else if (StompCommand.SEND.equals(sm.getStompCommand())) {
136+
abortClientHeartBeatSend();
137+
} else if (StompCommand.MESSAGE.equals(sm.getStompCommand())) {
138+
//a MESSAGE works as an hear-beat too.
139+
abortServerHeartBeatCheck();
140+
}
141+
142+
if (stompMessage.equals("\n")) {
143+
Log.d(TAG, "<<< PONG");
144+
abortServerHeartBeatCheck();
145+
} else {
146+
Log.d(TAG, "Receive STOMP message: " + stompMessage);
147+
mMessagesStream.onNext(stompMessage);
148+
}
116149
}
117150

118151
@NonNull
119152
@Override
120153
public Observable<LifecycleEvent> lifecycle() {
121154
return mLifecycleStream;
122155
}
156+
157+
/**
158+
* Analise heart-beat sent from server (if any), to adjust the frequency.
159+
* Startup the heart-beat logic.
160+
*
161+
* @param heartBeatHeader
162+
*/
163+
private void heartBeatHandshake(final String heartBeatHeader) {
164+
if (heartBeatHeader != null) {
165+
// The heart-beat header is OPTIONAL
166+
final String[] heartbeats = heartBeatHeader.split(",");
167+
if (clientHeartbeat > 0) {
168+
//there will be heart-beats every MAX(<cx>,<sy>) milliseconds
169+
clientHeartbeat = Math.max(clientHeartbeat, Integer.parseInt(heartbeats[1]));
170+
}
171+
if (serverHeartbeat > 0) {
172+
//there will be heart-beats every MAX(<cx>,<sy>) milliseconds
173+
serverHeartbeat = Math.max(serverHeartbeat, Integer.parseInt(heartbeats[0]));
174+
}
175+
}
176+
if (clientHeartbeat > 0 || serverHeartbeat > 0) {
177+
scheduler = Schedulers.io();
178+
179+
if (clientHeartbeat > 0) {
180+
//client MUST/WANT send heart-beat
181+
Log.d(TAG, "Client will send heart-beat every " + clientHeartbeat + " ms");
182+
scheduleClientHeartBeat();
183+
}
184+
if (serverHeartbeat > 0) {
185+
Log.d(TAG, "Client will listen to server heart-beat every " + serverHeartbeat + " ms");
186+
//client WANT to listen to server heart-beat
187+
scheduleServerHeartBeatCheck();
188+
}
189+
}
190+
}
191+
192+
protected void scheduleServerHeartBeatCheck() {
193+
if (serverHeartbeat > 0 && scheduler != null) {
194+
Log.d(TAG, "Scheduling server heart-beat to be checked in " + serverHeartbeat + " ms");
195+
//add some slack on the check
196+
serverCheckHeartBeatTask = scheduler.scheduleDirect(() ->
197+
checkServerHeartBeat(), serverHeartbeat, TimeUnit.MILLISECONDS);
198+
}
199+
}
200+
201+
private void checkServerHeartBeat() {
202+
if (serverHeartbeat > 0) {
203+
final long now = System.currentTimeMillis();
204+
//use a forgiving boundary as some heart beats can be delayed or lost.
205+
final long boundary = now - (3 * serverHeartbeat);
206+
//we need to check because the task could failed to abort
207+
if (lastServerHeartBeat < boundary) {
208+
Log.d(TAG, "It's a sad day ;( Server didn't send heart-beat on time. Last received at '" + lastServerHeartBeat + "' and now is '" + now + "'");
209+
final LifecycleEvent failedServerHeartBeat = new LifecycleEvent(LifecycleEvent.Type.FAILED_SERVER_HEARTBEAT);
210+
emitLifecycleEvent(failedServerHeartBeat);
211+
} else {
212+
Log.d(TAG, "We were checking and server sent heart-beat on time. So well-behaved :)");
213+
lastServerHeartBeat = System.currentTimeMillis();
214+
}
215+
}
216+
}
217+
218+
/**
219+
* Used to abort the server heart-beat check.
220+
*/
221+
private void abortServerHeartBeatCheck() {
222+
lastServerHeartBeat = System.currentTimeMillis();
223+
Log.d(TAG, "Aborted last check because server sent heart-beat on time ('" + lastServerHeartBeat + "'). So well-behaved :)");
224+
if (serverCheckHeartBeatTask != null) {
225+
serverCheckHeartBeatTask.dispose();
226+
}
227+
scheduleServerHeartBeatCheck();
228+
}
229+
230+
/**
231+
* Schedule a client heart-beat if clientHeartbeat > 0.
232+
*/
233+
public void scheduleClientHeartBeat() {
234+
if (clientHeartbeat > 0 && scheduler != null) {
235+
Log.d(TAG, "Scheduling client heart-beat to be sent in " + clientHeartbeat + " ms");
236+
clientSendHeartBeatTask = scheduler.scheduleDirect(() ->
237+
sendClientHeartBeat(), clientHeartbeat, TimeUnit.MILLISECONDS);
238+
}
239+
}
240+
241+
/**
242+
* Send the raw heart-beat to the server.
243+
*/
244+
private void sendClientHeartBeat() {
245+
this.rawSend("\r\n");
246+
Log.d(TAG, "PING >>>");
247+
//schedule next client heart beat
248+
this.scheduleClientHeartBeat();
249+
}
250+
251+
/**
252+
* Used when we have a scheduled heart-beat and we send a new message to the server.
253+
* The new message will work as an heart-beat so we can abort current one and schedule another
254+
*/
255+
private void abortClientHeartBeatSend() {
256+
if (clientSendHeartBeatTask != null) {
257+
clientSendHeartBeatTask.dispose();
258+
}
259+
scheduleClientHeartBeat();
260+
}
261+
262+
/**
263+
* Set the server heart-beat
264+
*
265+
* @param ms milliseconds
266+
*/
267+
public void setServerHeartbeat(int ms) {
268+
this.serverHeartbeat = ms;
269+
}
270+
271+
/**
272+
* Set the client heart-beat
273+
*
274+
* @param ms milliseconds
275+
*/
276+
public void setClientHeartbeat(int ms) {
277+
this.clientHeartbeat = ms;
278+
}
123279
}

lib/src/main/java/ua/naiksoftware/stomp/ConnectionProvider.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,18 @@ public interface ConnectionProvider {
3232
*/
3333
Completable disconnect();
3434

35-
Completable setHeartbeat(int ms);
35+
/**
36+
* Set the server heart-beat
37+
*
38+
* @param ms milliseconds
39+
*/
40+
void setServerHeartbeat(int ms);
41+
42+
/**
43+
* Set the client heart-beat
44+
*
45+
* @param ms milliseconds
46+
*/
47+
void setClientHeartbeat(int ms);
48+
3649
}

lib/src/main/java/ua/naiksoftware/stomp/LifecycleEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
public class LifecycleEvent {
1010

1111
public enum Type {
12-
OPENED, CLOSED, ERROR
12+
OPENED, CLOSED, ERROR, FAILED_SERVER_HEARTBEAT;
1313
}
1414

1515
private final Type mType;

lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,8 @@ public void onOpen(WebSocket webSocket, @NonNull Response response) {
6363

6464
@Override
6565
public void onMessage(WebSocket webSocket, String text) {
66-
if (text.equals("\n"))
67-
Log.d(TAG, "RECEIVED HEARTBEAT");
68-
else
69-
emitMessage(text);
66+
Log.d(TAG, "onMessage: " + text);
67+
emitMessage(text);
7068
}
7169

7270
@Override

lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ public class StompClient {
4545
private Disposable mLifecycleDisposable;
4646
private Disposable mMessagesDisposable;
4747
private List<StompHeader> mHeaders;
48-
private int heartbeat;
48+
49+
private int serverHeartbeat = 0;
50+
private int clientHeartbeat = 0;
4951

5052
public StompClient(ConnectionProvider connectionProvider) {
5153
mConnectionProvider = connectionProvider;
@@ -82,9 +84,23 @@ public void setParser(Parser parser) {
8284
*
8385
* @param ms heartbeat time in milliseconds
8486
*/
85-
public void setHeartbeat(int ms) {
86-
heartbeat = ms;
87-
mConnectionProvider.setHeartbeat(ms).subscribe();
87+
public StompClient withServerHeartbeat(int ms) {
88+
mConnectionProvider.setServerHeartbeat(ms);
89+
this.serverHeartbeat = ms;
90+
return this;
91+
}
92+
93+
/**
94+
* Sets the heartbeat interval that client propose to send.
95+
* <p>
96+
* Not very useful yet, because we don't have any heartbeat logic on our side.
97+
*
98+
* @param ms heartbeat time in milliseconds
99+
*/
100+
public StompClient withClientHeartbeat(int ms) {
101+
mConnectionProvider.setClientHeartbeat(ms);
102+
this.clientHeartbeat = ms;
103+
return this;
88104
}
89105

90106
/**
@@ -115,7 +131,7 @@ public void connect(@Nullable List<StompHeader> _headers) {
115131
case OPENED:
116132
List<StompHeader> headers = new ArrayList<>();
117133
headers.add(new StompHeader(StompHeader.VERSION, SUPPORTED_VERSIONS));
118-
headers.add(new StompHeader(StompHeader.HEART_BEAT, "0," + heartbeat));
134+
headers.add(new StompHeader(StompHeader.HEART_BEAT, clientHeartbeat + "," + serverHeartbeat));
119135
if (_headers != null) headers.addAll(_headers);
120136
mConnectionProvider.send(new StompMessage(StompCommand.CONNECT, headers, null).compile(legacyWhitespace))
121137
.subscribe();
@@ -132,6 +148,11 @@ public void connect(@Nullable List<StompHeader> _headers) {
132148
setConnected(false);
133149
isConnecting = false;
134150
break;
151+
152+
case FAILED_SERVER_HEARTBEAT:
153+
Log.d(TAG, "Server failed to send heart-beat in time.");
154+
break;
155+
135156
}
136157
});
137158

@@ -143,7 +164,6 @@ public void connect(@Nullable List<StompHeader> _headers) {
143164
.subscribe(stompMessage -> {
144165
setConnected(true);
145166
isConnecting = false;
146-
147167
});
148168
}
149169

@@ -173,7 +193,8 @@ public Completable send(String destination, String data) {
173193
}
174194

175195
public Completable send(@NonNull StompMessage stompMessage) {
176-
Completable completable = mConnectionProvider.send(stompMessage.compile(legacyWhitespace));
196+
Completable completable = mConnectionProvider.send(stompMessage.compile(legacyWhitespace))
197+
.doOnError(t -> t.printStackTrace());
177198
CompletableSource connectionComplete = mConnectionStream
178199
.filter(isConnected -> isConnected)
179200
.firstOrError().toCompletable();
@@ -201,7 +222,9 @@ public Completable disconnectCompletable() {
201222
mMessagesDisposable.dispose();
202223
}
203224
return mConnectionProvider.disconnect()
204-
.doOnComplete(() -> setConnected(false));
225+
.doOnComplete(() -> {
226+
setConnected(false);
227+
});
205228
}
206229

207230
public Flowable<StompMessage> topic(String destinationPath) {

0 commit comments

Comments
 (0)