Skip to content

Commit 01a82b5

Browse files
committed
Improve semantics writing currentData
Before this commit, the return value from write was interpreted as the data being fully written and ready to be released via releaseData(). This is not true for WebSocketSession implementations where a true return value simply means the message was sent with the full payload but releas is not appropriate until a send confirmation. Technically not an issue since WebSocketSession's extending this do not use pooled buffers. Nevertheless this commit refines the semantics of write, removes the releaseData() method, and makes sub-classes responsible for releasing the buffer when fully written (and they know best when that is). As a bonus currentData is now private. Issue: SPR-16207
1 parent 102a0ad commit 01a82b5

File tree

6 files changed

+47
-51
lines changed

6 files changed

+47
-51
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
324324
}
325325

326326
public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) {
327-
// ignore
327+
throw new IllegalStateException(toString());
328328
}
329329

330330
public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
5151
private Subscription subscription;
5252

5353
@Nullable
54-
protected volatile T currentData;
54+
private volatile T currentData;
5555

5656
private volatile boolean subscriberCompleted;
5757

@@ -142,21 +142,19 @@ protected void dataReceived(T data) {
142142
this.currentData = data;
143143
}
144144

145-
/**
146-
* Called when the current received data item can be released.
147-
*/
148-
protected abstract void releaseData();
149-
150145
/**
151146
* Whether writing is possible.
152147
*/
153148
protected abstract boolean isWritePossible();
154149

155150
/**
156151
* Write the given item.
152+
* <p><strong>Note:</strong> Sub-classes are responsible for releasing any
153+
* data buffer associated with the item, once fully written, if pooled
154+
* buffers apply to the underlying container.
157155
* @param data the item to write
158-
* @return whether the data was fully written ({@code true})
159-
* and new data can be requested, or otherwise ({@code false})
156+
* @return whether the current data item was written and another one
157+
* requested ({@code true}), or or otherwise if more writes are required.
160158
*/
161159
protected abstract boolean write(T data) throws IOException;
162160

@@ -165,7 +163,7 @@ protected void dataReceived(T data) {
165163
* the next item from the upstream, write Publisher.
166164
* <p>The default implementation is a no-op.
167165
*/
168-
protected void suspendWriting() {
166+
protected void writingPaused() {
169167
}
170168

171169
/**
@@ -275,15 +273,14 @@ public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
275273
T data = processor.currentData;
276274
Assert.state(data != null, "No data");
277275
try {
278-
boolean writeCompleted = processor.write(data);
279-
if (writeCompleted) {
280-
processor.releaseData();
276+
if (processor.write(data)) {
281277
if (processor.changeState(WRITING, REQUESTED)) {
278+
processor.currentData = null;
282279
if (processor.subscriberCompleted) {
283280
processor.changeStateToComplete(REQUESTED);
284281
}
285282
else {
286-
processor.suspendWriting();
283+
processor.writingPaused();
287284
Assert.state(processor.subscription != null, "No subscription");
288285
processor.subscription.request(1);
289286
}

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.springframework.core.io.buffer.DataBuffer;
3636
import org.springframework.core.io.buffer.DataBufferFactory;
3737
import org.springframework.core.io.buffer.DataBufferUtils;
38-
import org.springframework.http.HttpStatus;
3938
import org.springframework.http.MediaType;
4039
import org.springframework.http.ResponseCookie;
4140
import org.springframework.lang.Nullable;
@@ -303,15 +302,6 @@ protected boolean isWritePossible() {
303302
return ServletServerHttpResponse.this.isWritePossible();
304303
}
305304

306-
@Override
307-
protected void releaseData() {
308-
if (logger.isTraceEnabled()) {
309-
logger.trace("releaseData: " + this.currentData);
310-
}
311-
DataBufferUtils.release(this.currentData);
312-
this.currentData = null;
313-
}
314-
315305
@Override
316306
protected boolean isDataEmpty(DataBuffer dataBuffer) {
317307
return dataBuffer.readableByteCount() == 0;
@@ -335,11 +325,15 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
335325
if (this.logger.isTraceEnabled()) {
336326
this.logger.trace("written: " + written + " total: " + remaining);
337327
}
338-
return written == remaining;
339-
}
340-
else {
341-
return false;
328+
if (written == remaining) {
329+
if (logger.isTraceEnabled()) {
330+
logger.trace("releaseData: " + dataBuffer);
331+
}
332+
DataBufferUtils.release(dataBuffer);
333+
return true;
334+
}
342335
}
336+
return false;
343337
}
344338

345339
@Override

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,15 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
193193
if (logger.isTraceEnabled()) {
194194
logger.trace("written: " + written + " total: " + total);
195195
}
196-
return written == total;
196+
if (written != total) {
197+
return false;
198+
}
199+
if (logger.isTraceEnabled()) {
200+
logger.trace("releaseData: " + dataBuffer);
201+
}
202+
DataBufferUtils.release(dataBuffer);
203+
this.byteBuffer = null;
204+
return true;
197205
}
198206

199207
private int writeByteBuffer(ByteBuffer byteBuffer) throws IOException {
@@ -213,24 +221,13 @@ protected void dataReceived(DataBuffer dataBuffer) {
213221
this.byteBuffer = dataBuffer.asByteBuffer();
214222
}
215223

216-
@Override
217-
protected void releaseData() {
218-
if (logger.isTraceEnabled()) {
219-
logger.trace("releaseData: " + this.currentData);
220-
}
221-
DataBufferUtils.release(this.currentData);
222-
this.currentData = null;
223-
224-
this.byteBuffer = null;
225-
}
226-
227224
@Override
228225
protected boolean isDataEmpty(DataBuffer dataBuffer) {
229226
return (dataBuffer.readableByteCount() == 0);
230227
}
231228

232229
@Override
233-
protected void suspendWriting() {
230+
protected void writingPaused() {
234231
this.channel.suspendWrites();
235232
}
236233

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ public Mono<Void> send(Publisher<WebSocketMessage> messages) {
153153

154154
/**
155155
* Send the given WebSocket message.
156+
* <p><strong>Note:</strong> Sub-classes are responsible for releasing the
157+
* payload data buffer, once fully written, if pooled buffers apply to the
158+
* underlying container.
156159
*/
157160
protected abstract boolean sendMessage(WebSocketMessage message) throws IOException;
158161

@@ -268,19 +271,14 @@ protected boolean write(WebSocketMessage message) throws IOException {
268271
return sendMessage(message);
269272
}
270273

271-
@Override
272-
protected void releaseData() {
273-
this.currentData = null;
274-
}
275-
276274
@Override
277275
protected boolean isDataEmpty(WebSocketMessage message) {
278276
return (message.getPayload().readableByteCount() == 0);
279277
}
280278

281279
@Override
282280
protected boolean isWritePossible() {
283-
return (this.isReady && this.currentData != null);
281+
return (this.isReady);
284282
}
285283

286284
/**

spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import reactor.core.publisher.Mono;
2828
import reactor.core.publisher.MonoProcessor;
2929

30+
import org.springframework.core.io.buffer.DataBuffer;
3031
import org.springframework.core.io.buffer.DataBufferFactory;
32+
import org.springframework.core.io.buffer.DataBufferUtils;
3133
import org.springframework.lang.Nullable;
3234
import org.springframework.util.ObjectUtils;
3335
import org.springframework.web.reactive.socket.CloseStatus;
@@ -78,19 +80,19 @@ protected boolean sendMessage(WebSocketMessage message) throws IOException {
7880
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
7981
getSendProcessor().setReadyToSend(false);
8082
String text = new String(buffer.array(), StandardCharsets.UTF_8);
81-
WebSockets.sendText(text, getDelegate(), new SendProcessorCallback());
83+
WebSockets.sendText(text, getDelegate(), new SendProcessorCallback(message.getPayload()));
8284
}
8385
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
8486
getSendProcessor().setReadyToSend(false);
85-
WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback());
87+
WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
8688
}
8789
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
8890
getSendProcessor().setReadyToSend(false);
89-
WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback());
91+
WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
9092
}
9193
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
9294
getSendProcessor().setReadyToSend(false);
93-
WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback());
95+
WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
9496
}
9597
else {
9698
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
@@ -110,14 +112,22 @@ public Mono<Void> close(CloseStatus status) {
110112

111113
private final class SendProcessorCallback implements WebSocketCallback<Void> {
112114

115+
private final DataBuffer payload;
116+
117+
SendProcessorCallback(DataBuffer payload) {
118+
this.payload = payload;
119+
}
120+
113121
@Override
114122
public void complete(WebSocketChannel channel, Void context) {
123+
DataBufferUtils.release(this.payload);
115124
getSendProcessor().setReadyToSend(true);
116125
getSendProcessor().onWritePossible();
117126
}
118127

119128
@Override
120129
public void onError(WebSocketChannel channel, Void context, Throwable throwable) {
130+
DataBufferUtils.release(this.payload);
121131
getSendProcessor().cancel();
122132
getSendProcessor().onError(throwable);
123133
}

0 commit comments

Comments
 (0)