Skip to content

Commit 7efb620

Browse files
committed
MessagingRSocket correctly handles unconsumed input
Closes gh-24741
1 parent 0d42a1b commit 7efb620

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/MessagingRSocket.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -174,7 +174,7 @@ private Flux<Payload> handleAndReply(Payload firstPayload, FrameType frameType,
174174
.doFinally(s -> {
175175
// Subscription should have happened by now due to ChannelSendOperator
176176
if (!read.get()) {
177-
buffers.subscribe(DataBufferUtils::release);
177+
firstPayload.release();
178178
}
179179
})
180180
.thenMany(Flux.defer(() -> replyMono.isTerminated() ?

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import io.rsocket.RSocket;
2828
import io.rsocket.RSocketFactory;
2929
import io.rsocket.SocketAcceptor;
30+
import io.rsocket.exceptions.ApplicationErrorException;
3031
import io.rsocket.frame.decoder.PayloadDecoder;
3132
import io.rsocket.plugins.RSocketInterceptor;
3233
import io.rsocket.transport.netty.server.CloseableChannel;
@@ -149,6 +150,13 @@ void ignoreInput() {
149150
StepVerifier.create(result).expectNext("bar").thenCancel().verify(Duration.ofSeconds(5));
150151
}
151152

153+
@Test // gh-24741
154+
void noSuchRouteOnChannelInteraction() {
155+
Flux<String> input = Flux.just("foo", "bar", "baz");
156+
Flux<String> result = requester.route("no-such-route").data(input).retrieveFlux(String.class);
157+
StepVerifier.create(result).expectError(ApplicationErrorException.class).verify(Duration.ofSeconds(5));
158+
}
159+
152160

153161
@Controller
154162
static class ServerController {

0 commit comments

Comments
 (0)