Skip to content

Commit 0d4d455

Browse files
committed
add ping and pong events
1 parent 599eb98 commit 0d4d455

File tree

4 files changed

+78
-3
lines changed

4 files changed

+78
-3
lines changed

src/main/java/io/socket/client/Manager.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public class Manager extends Emitter {
6464

6565
public static final String EVENT_RECONNECTING = "reconnecting";
6666

67+
public static final String EVENT_PING = "ping";
68+
69+
public static final String EVENT_PONG = "pong";
70+
6771
/**
6872
* Called when a new transport is created. (experimental)
6973
*/
@@ -85,6 +89,7 @@ public class Manager extends Emitter {
8589
private Backoff backoff;
8690
private long _timeout;
8791
private Set<Socket> connecting = new HashSet<Socket>();
92+
private Date lastPing;
8893
private URI uri;
8994
private List<Packet> packetBuffer;
9095
private Queue<On.Handle> subs;
@@ -348,10 +353,16 @@ public void call(Object... objects) {
348353
}
349354
}
350355
}));
351-
this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() {
356+
this.subs.add(On.on(socket, Engine.EVENT_PING, new Listener() {
352357
@Override
353358
public void call(Object... objects) {
354-
Manager.this.ondecoded((Packet) objects[0]);
359+
Manager.this.onping();
360+
}
361+
}));
362+
this.subs.add(On.on(socket, Engine.EVENT_PONG, new Listener() {
363+
@Override
364+
public void call(Object... objects) {
365+
Manager.this.onpong();
355366
}
356367
}));
357368
this.subs.add(On.on(socket, Engine.EVENT_ERROR, new Listener() {
@@ -366,6 +377,22 @@ public void call(Object... objects) {
366377
Manager.this.onclose((String)objects[0]);
367378
}
368379
}));
380+
this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() {
381+
@Override
382+
public void call(Object... objects) {
383+
Manager.this.ondecoded((Packet) objects[0]);
384+
}
385+
}));
386+
}
387+
388+
private void onping() {
389+
this.lastPing = new Date();
390+
this.emitAll(EVENT_PING);
391+
}
392+
393+
private void onpong() {
394+
this.emitAll(EVENT_PONG,
395+
null != this.lastPing ? new Date().getTime() - this.lastPing.getTime() : 0);
369396
}
370397

371398
private void ondata(String data) {
@@ -458,8 +485,12 @@ private void processPacketQueue() {
458485
}
459486

460487
private void cleanup() {
488+
logger.fine("cleanup");
489+
461490
On.Handle sub;
462491
while ((sub = this.subs.poll()) != null) sub.destroy();
492+
493+
this.lastPing = null;
463494
}
464495

465496
/*package*/ void close() {

src/main/java/io/socket/client/Socket.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ public class Socket extends Emitter {
5858

5959
public static final String EVENT_RECONNECTING = Manager.EVENT_RECONNECTING;
6060

61+
public static final String EVENT_PING = Manager.EVENT_PING;
62+
63+
public static final String EVENT_PONG = Manager.EVENT_PONG;
64+
6165
protected static Map<String, Integer> events = new HashMap<String, Integer>() {{
6266
put(EVENT_CONNECT, 1);
6367
put(EVENT_CONNECT_ERROR, 1);
@@ -70,6 +74,8 @@ public class Socket extends Emitter {
7074
put(EVENT_RECONNECT_FAILED, 1);
7175
put(EVENT_RECONNECT_ERROR, 1);
7276
put(EVENT_RECONNECTING, 1);
77+
put(EVENT_PING, 1);
78+
put(EVENT_PONG, 1);
7379
}};
7480

7581
/*package*/ String id;

src/test/java/io/socket/client/SocketTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.concurrent.LinkedBlockingQueue;
1212

1313
import static org.hamcrest.CoreMatchers.*;
14+
import static org.hamcrest.Matchers.greaterThan;
1415
import static org.junit.Assert.assertThat;
1516

1617
@RunWith(JUnit4.class)
@@ -60,6 +61,43 @@ public void call(Object... args) {
6061
assertThat(id.isPresent(), is(false));
6162
}
6263

64+
@Test(timeout = TIMEOUT)
65+
public void pingAndPongWithLatency() throws URISyntaxException, InterruptedException {
66+
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
67+
socket = client();
68+
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
69+
@Override
70+
public void call(Object... objects) {
71+
final boolean[] pinged = new boolean[] { false };
72+
socket.once(Socket.EVENT_PING, new Emitter.Listener() {
73+
@Override
74+
public void call(Object... args) {
75+
pinged[0] = true;
76+
}
77+
});
78+
socket.once(Socket.EVENT_PONG, new Emitter.Listener() {
79+
@Override
80+
public void call(Object... args) {
81+
long ms = (long)args[0];
82+
values.offer(pinged[0]);
83+
values.offer(ms);
84+
}
85+
});
86+
}
87+
});
88+
socket.connect();
89+
90+
@SuppressWarnings("unchecked")
91+
boolean pinged = (boolean)values.take();
92+
assertThat(pinged, is(true));
93+
94+
@SuppressWarnings("unchecked")
95+
long ms = (long)values.take();
96+
assertThat(ms, greaterThan((long)0));
97+
98+
socket.disconnect();
99+
}
100+
63101
@Test(timeout = TIMEOUT)
64102
public void shouldChangeSocketIdUponReconnection() throws URISyntaxException, InterruptedException {
65103
final BlockingQueue<Optional> values = new LinkedBlockingQueue<Optional>();

src/test/resources/server.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ if (process.env.SSL) {
1010
server = require('http').createServer();
1111
}
1212

13-
var io = require('socket.io')(server);
13+
var io = require('socket.io')(server, { pingInterval: 2000 });
1414
var port = process.env.PORT || 3000;
1515
var nsp = process.argv[2] || '/';
1616
var slice = Array.prototype.slice;

0 commit comments

Comments
 (0)