Skip to content

Commit 33476a2

Browse files
committed
Backport fixes for discarding data buffers
Closes gh-26232
1 parent a8091b9 commit 33476a2

File tree

3 files changed

+52
-5
lines changed

3 files changed

+52
-5
lines changed

spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType eleme
128128
message.getHeaders().setContentLength(buffer.readableByteCount());
129129
return message.writeWith(Mono.just(buffer)
130130
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
131-
});
131+
})
132+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
132133
}
133134

134135
if (isStreamingMediaType(contentType)) {

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,28 @@ public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
203203
// We must resolve value first however, for a chance to handle potential error.
204204
if (body instanceof Mono) {
205205
return ((Mono<? extends DataBuffer>) body)
206-
.flatMap(buffer -> doCommit(() ->
207-
writeWithInternal(Mono.fromCallable(() -> buffer)
208-
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release))))
209-
.doOnError(t -> getHeaders().clearContentHeaders());
206+
.flatMap(buffer -> {
207+
AtomicReference<Boolean> subscribed = new AtomicReference<>(false);
208+
return doCommit(
209+
() -> {
210+
try {
211+
return writeWithInternal(Mono.fromCallable(() -> buffer)
212+
.doOnSubscribe(s -> subscribed.set(true))
213+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
214+
}
215+
catch (Throwable ex) {
216+
return Mono.error(ex);
217+
}
218+
})
219+
.doOnError(ex -> DataBufferUtils.release(buffer))
220+
.doOnCancel(() -> {
221+
if (!subscribed.get()) {
222+
DataBufferUtils.release(buffer);
223+
}
224+
});
225+
})
226+
.doOnError(t -> getHeaders().clearContentHeaders())
227+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
210228
}
211229
else {
212230
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))

spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,32 @@
1919
import java.nio.ByteBuffer;
2020
import java.nio.charset.StandardCharsets;
2121
import java.util.ArrayList;
22+
import java.util.Collections;
2223
import java.util.List;
24+
import java.util.Map;
2325
import java.util.function.Consumer;
2426
import java.util.function.Supplier;
2527

2628
import org.junit.jupiter.api.Test;
2729
import org.reactivestreams.Publisher;
2830
import reactor.core.publisher.Flux;
2931
import reactor.core.publisher.Mono;
32+
import reactor.netty.channel.AbortedException;
3033
import reactor.test.StepVerifier;
3134

35+
import org.springframework.core.ResolvableType;
3236
import org.springframework.core.io.buffer.DataBuffer;
3337
import org.springframework.core.io.buffer.DefaultDataBuffer;
3438
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
39+
import org.springframework.core.testfixture.io.buffer.LeakAwareDataBufferFactory;
3540
import org.springframework.http.HttpHeaders;
3641
import org.springframework.http.MediaType;
3742
import org.springframework.http.ResponseCookie;
43+
import org.springframework.http.codec.EncoderHttpMessageWriter;
44+
import org.springframework.http.codec.HttpMessageWriter;
45+
import org.springframework.http.codec.json.Jackson2JsonEncoder;
46+
import org.springframework.web.testfixture.http.server.reactive.MockServerHttpRequest;
47+
import org.springframework.web.testfixture.http.server.reactive.MockServerHttpResponse;
3848

3949
import static org.assertj.core.api.Assertions.assertThat;
4050

@@ -176,6 +186,24 @@ void beforeCommitErrorShouldLeaveResponseNotCommitted() {
176186
});
177187
}
178188

189+
@Test // gh-26232
190+
void monoResponseShouldNotLeakIfCancelled() {
191+
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
192+
MockServerHttpRequest request = MockServerHttpRequest.get("/").build();
193+
MockServerHttpResponse response = new MockServerHttpResponse(bufferFactory);
194+
response.setWriteHandler(flux -> {
195+
throw AbortedException.beforeSend();
196+
});
197+
198+
HttpMessageWriter<Object> messageWriter = new EncoderHttpMessageWriter<>(new Jackson2JsonEncoder());
199+
Mono<Void> result = messageWriter.write(Mono.just(Collections.singletonMap("foo", "bar")),
200+
ResolvableType.forClass(Mono.class), ResolvableType.forClass(Map.class), null,
201+
request, response, Collections.emptyMap());
202+
203+
StepVerifier.create(result).expectError(AbortedException.class).verify();
204+
205+
bufferFactory.checkForLeaks();
206+
}
179207

180208
private DefaultDataBuffer wrap(String a) {
181209
return new DefaultDataBufferFactory().wrap(ByteBuffer.wrap(a.getBytes(StandardCharsets.UTF_8)));

0 commit comments

Comments
 (0)