Skip to content

Commit 80d9f80

Browse files
committed
Websockets are broken
- remove manual demand, use `AutoDemaing` - fix #3814
1 parent de5f8f2 commit 80d9f80

File tree

4 files changed

+95
-19
lines changed

4 files changed

+95
-19
lines changed

modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyWebSocket.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import static java.nio.charset.StandardCharsets.UTF_8;
99

1010
import java.nio.ByteBuffer;
11-
import java.nio.channels.ReadPendingException;
1211
import java.util.ArrayList;
1312
import java.util.Collections;
1413
import java.util.List;
@@ -36,7 +35,8 @@
3635
import io.jooby.WebSocketMessage;
3736
import io.jooby.output.Output;
3837

39-
public class JettyWebSocket implements Session.Listener, WebSocketConfigurer, WebSocket {
38+
public class JettyWebSocket
39+
implements Session.Listener.AutoDemanding, WebSocketConfigurer, WebSocket {
4040

4141
private static class WriteCallbackAdaptor implements org.eclipse.jetty.websocket.api.Callback {
4242

@@ -75,11 +75,7 @@ public void fail(Throwable cause) {
7575

7676
@Override
7777
public void succeed() {
78-
try {
79-
callback.operationComplete(ws, null);
80-
} finally {
81-
ws.demand();
82-
}
78+
callback.operationComplete(ws, null);
8379
}
8480
}
8581

@@ -138,14 +134,11 @@ public void onWebSocketOpen(Session session) {
138134
open.set(true);
139135
this.session = session;
140136
addSession(this);
141-
demand();
142137
if (onConnectCallback != null) {
143138
onConnectCallback.onConnect(this);
144139
}
145140
} catch (Throwable x) {
146141
onWebSocketError(x);
147-
} finally {
148-
demand();
149142
}
150143
}
151144

@@ -217,14 +210,6 @@ public Context getContext() {
217210
return Context.readOnly(ctx);
218211
}
219212

220-
private void demand() {
221-
try {
222-
session.demand();
223-
} catch (ReadPendingException cause) {
224-
ctx.getRouter().getLog().debug("Websocket resulted in exception: {}", path, cause);
225-
}
226-
}
227-
228213
@NonNull @Override
229214
public List<WebSocket> getSessions() {
230215
List<JettyWebSocket> sessions = all.get(key);
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.i3814;
7+
8+
import java.util.concurrent.CountDownLatch;
9+
10+
import org.jetbrains.annotations.NotNull;
11+
import org.jetbrains.annotations.Nullable;
12+
13+
import io.jooby.junit.ServerTest;
14+
import io.jooby.junit.ServerTestRunner;
15+
import okhttp3.Response;
16+
import okhttp3.WebSocket;
17+
import okhttp3.WebSocketListener;
18+
import okio.ByteString;
19+
20+
public class Issue3814 {
21+
22+
@ServerTest
23+
public void shouldJettyWebSocketWorks(ServerTestRunner runner) {
24+
int messageCount = 10;
25+
var latch = new CountDownLatch(messageCount);
26+
runner
27+
.define(
28+
app -> {
29+
app.ws(
30+
"/3814",
31+
(ctx, initializer) -> {
32+
initializer.onMessage(
33+
(ws, message) -> {
34+
latch.countDown();
35+
});
36+
});
37+
})
38+
.ready(
39+
client -> {
40+
client.webSocket(
41+
"/3814",
42+
new WebSocketListener() {
43+
@Override
44+
public void onOpen(@NotNull WebSocket ws, @NotNull Response response) {
45+
for (int i = 0; i < messageCount; i++) {
46+
ws.send(">" + i);
47+
}
48+
}
49+
50+
@Override
51+
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
52+
super.onMessage(webSocket, text);
53+
}
54+
55+
@Override
56+
public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
57+
super.onMessage(webSocket, bytes);
58+
}
59+
60+
@Override
61+
public void onClosing(
62+
@NotNull WebSocket webSocket, int code, @NotNull String reason) {
63+
super.onClosing(webSocket, code, reason);
64+
}
65+
66+
@Override
67+
public void onClosed(
68+
@NotNull WebSocket webSocket, int code, @NotNull String reason) {
69+
super.onClosed(webSocket, code, reason);
70+
}
71+
72+
@Override
73+
public void onFailure(
74+
@NotNull WebSocket webSocket,
75+
@NotNull Throwable t,
76+
@Nullable Response response) {
77+
super.onFailure(webSocket, t, response);
78+
}
79+
});
80+
latch.await();
81+
});
82+
}
83+
}

tests/src/test/java/io/jooby/test/WebClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,14 @@ public void syncWebSocket(String path, SneakyThrows.Consumer<BlockingWebSocket>
313313
blockingWebSocket.close();
314314
}
315315

316+
public WebSocket webSocket(String path, WebSocketListener listener) {
317+
okhttp3.Request.Builder req = new okhttp3.Request.Builder();
318+
req.url("ws://localhost:" + port + path);
319+
setRequestHeaders(req);
320+
okhttp3.Request r = req.build();
321+
return client.newWebSocket(r, listener);
322+
}
323+
316324
public Request options(String path) {
317325
return invoke("OPTIONS", path, null);
318326
}

tests/src/test/java/io/jooby/test/WebSocketTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void webSocket(ServerTestRunner runner) {
4747
}
4848

4949
@ServerTest
50-
public void webSocketRecieveByteMessage(ServerTestRunner runner) {
50+
public void webSocketReceiveByteMessage(ServerTestRunner runner) {
5151
runner
5252
.define(
5353
app -> {

0 commit comments

Comments
 (0)