Skip to content

Commit ed172d6

Browse files
committed
ByteBuffer handling for Jetty WebSocket messages
Closes gh-31182
1 parent f51838b commit ed172d6

File tree

2 files changed

+238
-31
lines changed

2 files changed

+238
-31
lines changed

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

Lines changed: 226 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package org.springframework.web.reactive.socket.adapter;
1818

1919
import java.nio.ByteBuffer;
20+
import java.nio.charset.Charset;
2021
import java.nio.charset.StandardCharsets;
2122
import java.util.function.Function;
23+
import java.util.function.IntPredicate;
2224

2325
import org.eclipse.jetty.websocket.api.Callback;
2426
import org.eclipse.jetty.websocket.api.Frame;
@@ -31,14 +33,15 @@
3133
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
3234
import org.eclipse.jetty.websocket.core.OpCode;
3335

36+
import org.springframework.core.io.buffer.CloseableDataBuffer;
3437
import org.springframework.core.io.buffer.DataBuffer;
38+
import org.springframework.core.io.buffer.DataBufferFactory;
3539
import org.springframework.lang.Nullable;
3640
import org.springframework.util.Assert;
3741
import org.springframework.web.reactive.socket.CloseStatus;
3842
import org.springframework.web.reactive.socket.WebSocketHandler;
3943
import org.springframework.web.reactive.socket.WebSocketMessage;
4044
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
41-
import org.springframework.web.reactive.socket.WebSocketSession;
4245

4346
/**
4447
* Jetty {@link WebSocket @WebSocket} handler that delegates events to a
@@ -83,53 +86,36 @@ public void onWebSocketOpen(Session session) {
8386
@OnWebSocketMessage
8487
public void onWebSocketText(String message) {
8588
if (this.delegateSession != null) {
86-
WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message);
89+
byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
90+
DataBuffer buffer = this.delegateSession.bufferFactory().wrap(bytes);
91+
WebSocketMessage webSocketMessage = new WebSocketMessage(Type.TEXT, buffer);
8792
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
8893
}
8994
}
9095

9196
@OnWebSocketMessage
92-
public void onWebSocketBinary(ByteBuffer buffer, Callback callback) {
97+
public void onWebSocketBinary(ByteBuffer byteBuffer, Callback callback) {
9398
if (this.delegateSession != null) {
94-
WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer);
99+
DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer);
100+
buffer = new JettyDataBuffer(buffer, callback);
101+
WebSocketMessage webSocketMessage = new WebSocketMessage(Type.BINARY, buffer);
95102
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
96-
callback.succeed();
97103
}
98104
}
99105

100106
@OnWebSocketFrame
101107
public void onWebSocketFrame(Frame frame, Callback callback) {
102108
if (this.delegateSession != null) {
103109
if (OpCode.PONG == frame.getOpCode()) {
104-
ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
105-
WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer);
110+
ByteBuffer byteBuffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
111+
DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer);
112+
buffer = new JettyDataBuffer(buffer, callback);
113+
WebSocketMessage webSocketMessage = new WebSocketMessage(Type.PONG, buffer);
106114
this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage);
107-
callback.succeed();
108115
}
109116
}
110117
}
111118

112-
private <T> WebSocketMessage toMessage(Type type, T message) {
113-
WebSocketSession session = this.delegateSession;
114-
Assert.state(session != null, "Cannot create message without a session");
115-
if (Type.TEXT.equals(type)) {
116-
byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
117-
DataBuffer buffer = session.bufferFactory().wrap(bytes);
118-
return new WebSocketMessage(Type.TEXT, buffer);
119-
}
120-
else if (Type.BINARY.equals(type)) {
121-
DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message);
122-
return new WebSocketMessage(Type.BINARY, buffer);
123-
}
124-
else if (Type.PONG.equals(type)) {
125-
DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message);
126-
return new WebSocketMessage(Type.PONG, buffer);
127-
}
128-
else {
129-
throw new IllegalArgumentException("Unexpected message type: " + message);
130-
}
131-
}
132-
133119
@OnWebSocketClose
134120
public void onWebSocketClose(int statusCode, String reason) {
135121
if (this.delegateSession != null) {
@@ -144,4 +130,215 @@ public void onWebSocketError(Throwable cause) {
144130
}
145131
}
146132

133+
134+
private static final class JettyDataBuffer implements CloseableDataBuffer {
135+
136+
private final DataBuffer delegate;
137+
138+
private final Callback callback;
139+
140+
public JettyDataBuffer(DataBuffer delegate, Callback callback) {
141+
Assert.notNull(delegate, "'delegate` must not be null");
142+
Assert.notNull(callback, "Callback must not be null");
143+
this.delegate = delegate;
144+
this.callback = callback;
145+
}
146+
147+
@Override
148+
public void close() {
149+
this.callback.succeed();
150+
}
151+
152+
// delegation
153+
154+
@Override
155+
public DataBufferFactory factory() {
156+
return this.delegate.factory();
157+
}
158+
159+
@Override
160+
public int indexOf(IntPredicate predicate, int fromIndex) {
161+
return this.delegate.indexOf(predicate, fromIndex);
162+
}
163+
164+
@Override
165+
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
166+
return this.delegate.lastIndexOf(predicate, fromIndex);
167+
}
168+
169+
@Override
170+
public int readableByteCount() {
171+
return this.delegate.readableByteCount();
172+
}
173+
174+
@Override
175+
public int writableByteCount() {
176+
return this.delegate.writableByteCount();
177+
}
178+
179+
@Override
180+
public int capacity() {
181+
return this.delegate.capacity();
182+
}
183+
184+
@Override
185+
@Deprecated
186+
public DataBuffer capacity(int capacity) {
187+
this.delegate.capacity(capacity);
188+
return this;
189+
}
190+
191+
@Override
192+
public DataBuffer ensureWritable(int capacity) {
193+
this.delegate.ensureWritable(capacity);
194+
return this;
195+
}
196+
197+
@Override
198+
public int readPosition() {
199+
return this.delegate.readPosition();
200+
}
201+
202+
@Override
203+
public DataBuffer readPosition(int readPosition) {
204+
this.delegate.readPosition(readPosition);
205+
return this;
206+
}
207+
208+
@Override
209+
public int writePosition() {
210+
return this.delegate.writePosition();
211+
}
212+
213+
@Override
214+
public DataBuffer writePosition(int writePosition) {
215+
this.delegate.writePosition(writePosition);
216+
return this;
217+
}
218+
219+
@Override
220+
public byte getByte(int index) {
221+
return this.delegate.getByte(index);
222+
}
223+
224+
@Override
225+
public byte read() {
226+
return this.delegate.read();
227+
}
228+
229+
@Override
230+
public DataBuffer read(byte[] destination) {
231+
this.delegate.read(destination);
232+
return this;
233+
}
234+
235+
@Override
236+
public DataBuffer read(byte[] destination, int offset, int length) {
237+
this.delegate.read(destination, offset, length);
238+
return this;
239+
}
240+
241+
@Override
242+
public DataBuffer write(byte b) {
243+
this.delegate.write(b);
244+
return this;
245+
}
246+
247+
@Override
248+
public DataBuffer write(byte[] source) {
249+
this.delegate.write(source);
250+
return this;
251+
}
252+
253+
@Override
254+
public DataBuffer write(byte[] source, int offset, int length) {
255+
this.delegate.write(source, offset, length);
256+
return this;
257+
}
258+
259+
@Override
260+
public DataBuffer write(DataBuffer... buffers) {
261+
this.delegate.write(buffers);
262+
return this;
263+
}
264+
265+
@Override
266+
public DataBuffer write(ByteBuffer... buffers) {
267+
this.delegate.write(buffers);
268+
return this;
269+
}
270+
271+
@Override
272+
@Deprecated
273+
public DataBuffer slice(int index, int length) {
274+
DataBuffer delegateSlice = this.delegate.slice(index, length);
275+
return new JettyDataBuffer(delegateSlice, this.callback);
276+
}
277+
278+
@Override
279+
public DataBuffer split(int index) {
280+
DataBuffer delegateSplit = this.delegate.split(index);
281+
return new JettyDataBuffer(delegateSplit, this.callback);
282+
}
283+
284+
@Override
285+
@Deprecated
286+
public ByteBuffer asByteBuffer() {
287+
return this.delegate.asByteBuffer();
288+
}
289+
290+
@Override
291+
@Deprecated
292+
public ByteBuffer asByteBuffer(int index, int length) {
293+
return this.delegate.asByteBuffer(index, length);
294+
}
295+
296+
@Override
297+
@Deprecated
298+
public ByteBuffer toByteBuffer(int index, int length) {
299+
return this.delegate.toByteBuffer(index, length);
300+
}
301+
302+
@Override
303+
public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) {
304+
this.delegate.toByteBuffer(srcPos, dest, destPos, length);
305+
}
306+
307+
@Override
308+
public ByteBufferIterator readableByteBuffers() {
309+
ByteBufferIterator delegateIterator = this.delegate.readableByteBuffers();
310+
return new JettyByteBufferIterator(delegateIterator);
311+
}
312+
313+
@Override
314+
public ByteBufferIterator writableByteBuffers() {
315+
ByteBufferIterator delegateIterator = this.delegate.writableByteBuffers();
316+
return new JettyByteBufferIterator(delegateIterator);
317+
}
318+
319+
@Override
320+
public String toString(int index, int length, Charset charset) {
321+
return this.delegate.toString(index, length, charset);
322+
}
323+
324+
325+
private record JettyByteBufferIterator(ByteBufferIterator delegate) implements ByteBufferIterator {
326+
327+
@Override
328+
public void close() {
329+
this.delegate.close();
330+
}
331+
332+
@Override
333+
public boolean hasNext() {
334+
return this.delegate.hasNext();
335+
}
336+
337+
@Override
338+
public ByteBuffer next() {
339+
return this.delegate.next();
340+
}
341+
}
342+
}
343+
147344
}

spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,13 @@ public void onWebSocketText(String payload) {
9090

9191
@OnWebSocketMessage
9292
public void onWebSocketBinary(ByteBuffer payload, Callback callback) {
93-
BinaryMessage message = new BinaryMessage(payload, true);
93+
BinaryMessage message = new BinaryMessage(copyByteBuffer(payload), true);
9494
try {
9595
this.webSocketHandler.handleMessage(this.wsSession, message);
96+
callback.succeed();
9697
}
9798
catch (Exception ex) {
99+
callback.fail(ex);
98100
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
99101
}
100102
}
@@ -103,16 +105,24 @@ public void onWebSocketBinary(ByteBuffer payload, Callback callback) {
103105
public void onWebSocketFrame(Frame frame, Callback callback) {
104106
if (OpCode.PONG == frame.getOpCode()) {
105107
ByteBuffer payload = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD;
106-
PongMessage message = new PongMessage(payload);
108+
PongMessage message = new PongMessage(copyByteBuffer(payload));
107109
try {
108110
this.webSocketHandler.handleMessage(this.wsSession, message);
111+
callback.succeed();
109112
}
110113
catch (Exception ex) {
114+
callback.fail(ex);
111115
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger);
112116
}
113117
}
114118
}
115119

120+
private static ByteBuffer copyByteBuffer(ByteBuffer src) {
121+
ByteBuffer dest = ByteBuffer.allocate(src.capacity());
122+
dest.put(0, src, 0, src.remaining());
123+
return dest;
124+
}
125+
116126
@OnWebSocketClose
117127
public void onWebSocketClose(int statusCode, String reason) {
118128
CloseStatus closeStatus = new CloseStatus(statusCode, reason);

0 commit comments

Comments
 (0)