Skip to content

Commit e36d416

Browse files
committed
Filter empty buffers PayloadMethodArgumentResolver
An empty buffer for RSocket is an empty message and while some codecs such as StringDecoder may be able turn that into something such as an empty String, for other formats such as JSON it is invalid input. Closes gh-26344
1 parent d387d9a commit e36d416

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
3838
import org.springframework.core.codec.Decoder;
3939
import org.springframework.core.codec.DecodingException;
4040
import org.springframework.core.io.buffer.DataBuffer;
41+
import org.springframework.core.io.buffer.DataBufferUtils;
4142
import org.springframework.lang.Nullable;
4243
import org.springframework.messaging.Message;
4344
import org.springframework.messaging.MessageHeaders;
@@ -232,6 +233,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
232233
if (decoder.canDecode(elementType, mimeType)) {
233234
if (adapter != null && adapter.isMultiValue()) {
234235
Flux<?> flux = content
236+
.filter(this::nonEmptyDataBuffer)
235237
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
236238
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
237239
if (isContentRequired) {
@@ -245,6 +247,7 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
245247
else {
246248
// Single-value (with or without reactive type wrapper)
247249
Mono<?> mono = content.next()
250+
.filter(this::nonEmptyDataBuffer)
248251
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
249252
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
250253
if (isContentRequired) {
@@ -262,6 +265,14 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
262265
message, parameter, "Cannot decode to [" + targetType + "]" + message));
263266
}
264267

268+
private boolean nonEmptyDataBuffer(DataBuffer buffer) {
269+
if (buffer.readableByteCount() > 0) {
270+
return true;
271+
}
272+
DataBufferUtils.release(buffer);
273+
return false;
274+
}
275+
265276
private Throwable handleReadError(MethodParameter parameter, Message<?> message, Throwable ex) {
266277
return ex instanceof DecodingException ?
267278
new MethodArgumentResolutionException(message, parameter, "Failed to read HTTP message", ex) : ex;

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
1919
import java.time.Duration;
2020
import java.util.concurrent.atomic.AtomicInteger;
2121

22-
import io.rsocket.Payload;
2322
import io.rsocket.RSocket;
2423
import io.rsocket.SocketAcceptor;
2524
import io.rsocket.core.RSocketServer;
@@ -43,6 +42,7 @@
4342
import org.springframework.messaging.handler.annotation.Header;
4443
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
4544
import org.springframework.messaging.handler.annotation.MessageMapping;
45+
import org.springframework.messaging.handler.annotation.Payload;
4646
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
4747
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
4848
import org.springframework.stereotype.Controller;
@@ -164,6 +164,12 @@ public void echoChannel() {
164164
.verify(Duration.ofSeconds(5));
165165
}
166166

167+
@Test // gh-26344
168+
public void echoChannelWithEmptyInput() {
169+
Flux<String> result = requester.route("echo-channel-empty").data(Flux.empty()).retrieveFlux(String.class);
170+
StepVerifier.create(result).verifyComplete();
171+
}
172+
167173
@Test
168174
public void metadataPush() {
169175
Flux.just("bar", "baz")
@@ -254,6 +260,11 @@ Flux<String> echoChannel(Flux<String> payloads) {
254260
return payloads.delayElements(Duration.ofMillis(10)).map(payload -> payload + " async");
255261
}
256262

263+
@MessageMapping("echo-channel-empty")
264+
Flux<String> echoChannelEmpty(@Payload(required = false) Flux<String> payloads) {
265+
return payloads.map(payload -> payload + " echoed");
266+
}
267+
257268
@MessageMapping("thrown-exception")
258269
Mono<String> handleAndThrow(String payload) {
259270
throw new IllegalArgumentException("Invalid input error");
@@ -338,29 +349,29 @@ public RSocket apply(RSocket rsocket) {
338349
}
339350

340351
@Override
341-
public Mono<Void> fireAndForget(Payload payload) {
352+
public Mono<Void> fireAndForget(io.rsocket.Payload payload) {
342353
return this.delegate.fireAndForget(payload)
343354
.doOnSuccess(aVoid -> this.fireAndForgetCount.incrementAndGet());
344355
}
345356

346357
@Override
347-
public Mono<Void> metadataPush(Payload payload) {
358+
public Mono<Void> metadataPush(io.rsocket.Payload payload) {
348359
return this.delegate.metadataPush(payload)
349360
.doOnSuccess(aVoid -> this.metadataPushCount.incrementAndGet());
350361
}
351362

352363
@Override
353-
public Mono<Payload> requestResponse(Payload payload) {
364+
public Mono<io.rsocket.Payload> requestResponse(io.rsocket.Payload payload) {
354365
return this.delegate.requestResponse(payload);
355366
}
356367

357368
@Override
358-
public Flux<Payload> requestStream(Payload payload) {
369+
public Flux<io.rsocket.Payload> requestStream(io.rsocket.Payload payload) {
359370
return this.delegate.requestStream(payload);
360371
}
361372

362373
@Override
363-
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
374+
public Flux<io.rsocket.Payload> requestChannel(Publisher<io.rsocket.Payload> payloads) {
364375
return this.delegate.requestChannel(payloads);
365376
}
366377
}

0 commit comments

Comments
 (0)