Skip to content

Commit 545c4ef

Browse files
committed
Polish StompDecoder and the new Buffering sub-class
Issue: SPR-11527
1 parent bbdb72d commit 545c4ef

File tree

3 files changed

+85
-33
lines changed

3 files changed

+85
-33
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/BufferingStompDecoder.java

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.springframework.util.MultiValueMap;
2424

2525
import java.nio.ByteBuffer;
26-
import java.util.ArrayList;
2726
import java.util.Collections;
2827
import java.util.List;
2928
import java.util.Queue;
@@ -32,13 +31,17 @@
3231

3332
/**
3433
* An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder}
35-
* that chunks any bytes remaining after a single full STOMP frame has been read.
36-
* The remaining bytes may contain more STOMP frames or an incomplete STOMP frame.
34+
* that buffers content remaining in the input ByteBuffer after the parent
35+
* class has read all (complete) STOMP frames from it. The remaining content
36+
* represents an incomplete STOMP frame. When called repeatedly with additional
37+
* data, the decode method returns one or more messages or, if there is not
38+
* enough data still, continues to buffer.
3739
*
38-
* <p>Similarly if there is not enough content for a full STOMP frame, the content
39-
* is buffered until more input is received. That means the
40-
* {@link #decode(java.nio.ByteBuffer)} effectively never returns {@code null} as
41-
* the parent class does.
40+
* <p>A single instance of this decoder can be invoked repeatedly to read all
41+
* messages from a single stream (e.g. WebSocket session) as long as decoding
42+
* does not fail. If there is an exception, StompDecoder instance should not
43+
* be used any more as its internal state is not guaranteed to be consistent.
44+
* It is expected that the underlying session is closed at that point.
4245
*
4346
* @author Rossen Stoyanchev
4447
* @since 4.0.3
@@ -58,10 +61,16 @@ public BufferingStompDecoder(int bufferSizeLimit) {
5861
}
5962

6063

64+
/**
65+
* Return the configured buffer size limit.
66+
*/
6167
public int getBufferSizeLimit() {
6268
return this.bufferSizeLimit;
6369
}
6470

71+
/**
72+
* Calculate the current buffer size.
73+
*/
6574
public int getBufferSize() {
6675
int size = 0;
6776
for (ByteBuffer buffer : this.chunks) {
@@ -70,29 +79,50 @@ public int getBufferSize() {
7079
return size;
7180
}
7281

82+
/**
83+
* Get the expected content length of the currently buffered, incomplete STOMP frame.
84+
*/
7385
public Integer getExpectedContentLength() {
7486
return this.expectedContentLength;
7587
}
7688

7789

90+
/**
91+
* Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
92+
* list of {@link Message}s.
93+
*
94+
* <p>If there was enough data to parse a "content-length" header, then the
95+
* value is used to determine how much more data is needed before a new
96+
* attempt to decode is made.
97+
*
98+
* <p>If there was not enough data to parse the "content-length", or if there
99+
* is "content-length" header, every subsequent call to decode attempts to
100+
* parse again with all available data. Therefore the presence of a "content-length"
101+
* header helps to optimize the decoding of large messages.
102+
*
103+
* @param newBuffer a buffer containing new data to decode
104+
*
105+
* @return decoded messages or an empty list
106+
* @throws StompConversionException raised in case of decoding issues
107+
*/
78108
@Override
79-
public List<Message<byte[]>> decode(ByteBuffer newData) {
109+
public List<Message<byte[]>> decode(ByteBuffer newBuffer) {
80110

81-
this.chunks.add(newData);
111+
this.chunks.add(newBuffer);
82112

83113
checkBufferLimits();
84114

85115
if (getExpectedContentLength() != null && getBufferSize() < this.expectedContentLength) {
86116
return Collections.<Message<byte[]>>emptyList();
87117
}
88118

89-
ByteBuffer buffer = assembleChunksAndReset();
119+
ByteBuffer bufferToDecode = assembleChunksAndReset();
90120

91121
MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
92-
List<Message<byte[]>> messages = decode(buffer, headers);
122+
List<Message<byte[]>> messages = decode(bufferToDecode, headers);
93123

94-
if (buffer.hasRemaining()) {
95-
this.chunks.add(buffer);
124+
if (bufferToDecode.hasRemaining()) {
125+
this.chunks.add(bufferToDecode);
96126
this.expectedContentLength = getContentLength(headers);
97127
}
98128

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompDecoder.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@
2828
import org.springframework.messaging.Message;
2929
import org.springframework.messaging.simp.SimpMessageType;
3030
import org.springframework.messaging.support.MessageBuilder;
31+
import org.springframework.util.Assert;
3132
import org.springframework.util.LinkedMultiValueMap;
3233
import org.springframework.util.MultiValueMap;
3334

3435
/**
35-
* Decodes one or more STOMP frames from a {@link ByteBuffer}. If the buffer
36-
* contains any additional (incomplete) data, or perhaps not enough data to
37-
* form even one Message, the the buffer is reset and the value returned is
38-
* an empty list indicating that no more message can be read.
36+
* Decodes one or more STOMP frames contained in a {@link ByteBuffer}.
37+
*
38+
* <p>An attempt is made to read all complete STOMP frames from the buffer, which
39+
* could be zero, one, or more. If there is any left-over content, i.e. an incomplete
40+
* STOMP frame, at the end the buffer is reset to point to the beginning of the
41+
* partial content. The caller is then responsible for dealing with that
42+
* incomplete content by buffering until there is more input available.
3943
*
4044
* @author Andy Wilkinson
4145
* @author Rossen Stoyanchev
@@ -52,10 +56,8 @@ public class StompDecoder {
5256

5357

5458
/**
55-
* Decodes one or more STOMP frames from the given {@code buffer} into a
56-
* list of {@link Message}s.
57-
*
58-
* <p>If the given ByteBuffer contains partial STOMP frame content, or additional
59+
* Decodes one or more STOMP frames from the given {@code ByteBuffer} into a
60+
* list of {@link Message}s. If the input buffer contains any incplcontains partial STOMP frame content, or additional
5961
* content with a partial STOMP frame, the buffer is reset and {@code null} is
6062
* returned.
6163
*
@@ -68,27 +70,37 @@ public List<Message<byte[]>> decode(ByteBuffer buffer) {
6870
}
6971

7072
/**
71-
* Decodes one or more STOMP frames from the given {@code buffer} into a
72-
* list of {@link Message}s.
73+
* Decodes one or more STOMP frames from the given {@code buffer} and returns
74+
* a list of {@link Message}s.
7375
*
74-
* <p>If the given ByteBuffer contains partial STOMP frame content, or additional
75-
* content with a partial STOMP frame, the buffer is reset and {@code null} is
76-
* returned.
76+
* <p>If the given ByteBuffer contains only partial STOMP frame content and no
77+
* complete STOMP frames, an empty list is returned, and the buffer is reset to
78+
* to where it was.
79+
*
80+
* <p>If the buffer contains one ore more STOMP frames, those are returned and
81+
* the buffer reset to point to the beginning of the unused partial content.
82+
*
83+
* <p>The input headers map is used to store successfully parsed headers and
84+
* is cleared after ever successfully read message. So when partial content is
85+
* read the caller can check if a "content-length" header was read, which helps
86+
* to determine how much more content is needed before the next STOMP frame
87+
* can be decoded.
7788
*
7889
* @param buffer The buffer to decode the STOMP frame from
79-
* @param headers an empty map that will be filled with the successfully parsed
80-
* headers of the last decoded message, or the last attempt at decoding an
81-
* (incomplete) STOMP frame. This can be useful for detecting 'content-length'.
90+
* @param headers an empty map that will contain successfully parsed headers
91+
* in cases where the partial buffer ended with a partial STOMP frame
8292
*
83-
* @return the decoded messages or an empty list
93+
* @return decoded messages or an empty list
94+
* @throws StompConversionException raised in case of decoding issues
8495
*/
8596
public List<Message<byte[]>> decode(ByteBuffer buffer, MultiValueMap<String, String> headers) {
97+
Assert.notNull(headers, "headers is required");
8698
List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
8799
while (buffer.hasRemaining()) {
88-
headers.clear();
89100
Message<byte[]> m = decodeMessage(buffer, headers);
90101
if (m != null) {
91102
messages.add(m);
103+
headers.clear();
92104
}
93105
else {
94106
break;

spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,25 @@ public class StompSubProtocolHandler implements SubProtocolHandler {
7979

8080

8181
/**
82-
* Set the message buffer size limit in bytes.
82+
* Configure the maximum size of the buffer used when a STOMP message has been
83+
* split over multiple WebSocket messages.
84+
*
85+
* <p>While the STOMP spec version 1.2 (current as of 4.0.3) does not discuss
86+
* STOMP over WebSocket explicitly, a number of clients already split messages
87+
* around 16K boundaries. Therefore partial content must be buffered before a
88+
* full message can be assembled.
89+
*
90+
* <p>By default this property is set to 64K.
91+
*
8392
* @since 4.0.3
8493
*/
8594
public void setMessageBufferSizeLimit(int messageBufferSizeLimit) {
8695
this.messageBufferSizeLimit = messageBufferSizeLimit;
8796
}
8897

8998
/**
90-
* Get the message buffer size limit in bytes.
99+
* Get the configured message buffer size limit in bytes.
100+
*
91101
* @since 4.0.3
92102
*/
93103
public int getMessageBufferSizeLimit() {

0 commit comments

Comments
 (0)