Skip to content

Commit c543557

Browse files
committed
Buffer leak fixes
Address issues where buffers are allocated (and cached somehow) at or before subscription, and before explicit demand. The commit adds tests proving the leaks and fixes. The common thread for all tests is a "zero demand" subscriber that subscribes but does not request, and then cancels without consuming anything. Closes gh-22107
1 parent 65b4607 commit c543557

File tree

16 files changed

+504
-211
lines changed

16 files changed

+504
-211
lines changed

spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import org.springframework.core.ResolvableType;
2525
import org.springframework.core.io.buffer.DataBuffer;
2626
import org.springframework.core.io.buffer.DataBufferFactory;
27+
import org.springframework.core.io.buffer.PooledDataBuffer;
2728
import org.springframework.lang.Nullable;
2829
import org.springframework.util.MimeType;
2930

@@ -47,9 +48,10 @@ public AbstractSingleValueEncoder(MimeType... supportedMimeTypes) {
4748
public final Flux<DataBuffer> encode(Publisher<? extends T> inputStream, DataBufferFactory bufferFactory,
4849
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
4950

50-
return Flux.from(inputStream).
51-
take(1).
52-
concatMap(t -> encode(t, bufferFactory, elementType, mimeType, hints));
51+
return Flux.from(inputStream)
52+
.take(1)
53+
.concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints))
54+
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
5355
}
5456

5557
/**

spring-core/src/main/java/org/springframework/core/codec/ResourceRegionEncoder.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.core.codec;
1818

1919
import java.io.IOException;
20-
import java.nio.ByteBuffer;
2120
import java.nio.charset.StandardCharsets;
2221
import java.util.Map;
2322
import java.util.OptionalLong;
@@ -89,24 +88,22 @@ public Flux<DataBuffer> encode(Publisher<? extends ResourceRegion> inputStream,
8988
return Mono.from(inputStream)
9089
.flatMapMany(region -> {
9190
if (!region.getResource().isReadable()) {
92-
return Flux.error(new EncodingException("Resource " +
93-
region.getResource() + " is not readable"));
91+
return Flux.error(new EncodingException(
92+
"Resource " + region.getResource() + " is not readable"));
9493
}
95-
9694
return writeResourceRegion(region, bufferFactory, hints);
9795
});
9896
}
9997
else {
10098
final String boundaryString = Hints.getRequiredHint(hints, BOUNDARY_STRING_HINT);
10199
byte[] startBoundary = getAsciiBytes("\r\n--" + boundaryString + "\r\n");
102-
byte[] contentType =
103-
(mimeType != null ? getAsciiBytes("Content-Type: " + mimeType + "\r\n") : new byte[0]);
100+
byte[] contentType = mimeType != null ? getAsciiBytes("Content-Type: " + mimeType + "\r\n") : new byte[0];
104101

105102
return Flux.from(inputStream).
106103
concatMap(region -> {
107104
if (!region.getResource().isReadable()) {
108-
return Flux.error(new EncodingException("Resource " +
109-
region.getResource() + " is not readable"));
105+
return Flux.error(new EncodingException(
106+
"Resource " + region.getResource() + " is not readable"));
110107
}
111108
else {
112109
return Flux.concat(
@@ -121,11 +118,10 @@ public Flux<DataBuffer> encode(Publisher<? extends ResourceRegion> inputStream,
121118
private Flux<DataBuffer> getRegionPrefix(DataBufferFactory bufferFactory, byte[] startBoundary,
122119
byte[] contentType, ResourceRegion region) {
123120

124-
return Flux.defer(() -> Flux.just(
125-
bufferFactory.allocateBuffer(startBoundary.length).write(startBoundary),
126-
bufferFactory.allocateBuffer(contentType.length).write(contentType),
127-
bufferFactory.wrap(ByteBuffer.wrap(getContentRangeHeader(region))))
128-
);
121+
return Flux.just(
122+
bufferFactory.wrap(startBoundary),
123+
bufferFactory.wrap(contentType),
124+
bufferFactory.wrap(getContentRangeHeader(region))); // only wrapping, no allocation
129125
}
130126

131127
private Flux<DataBuffer> writeResourceRegion(
@@ -146,8 +142,7 @@ private Flux<DataBuffer> writeResourceRegion(
146142

147143
private Flux<DataBuffer> getRegionSuffix(DataBufferFactory bufferFactory, String boundaryString) {
148144
byte[] endBoundary = getAsciiBytes("\r\n--" + boundaryString + "--");
149-
return Flux.defer(() -> Flux.just(
150-
bufferFactory.allocateBuffer(endBoundary.length).write(endBoundary)));
145+
return Flux.just(bufferFactory.wrap(endBoundary));
151146
}
152147

153148
private byte[] getAsciiBytes(String in) {

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 45 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -82,40 +82,36 @@ public static Flux<DataBuffer> readInputStream(
8282
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
8383
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
8484
* @param channelSupplier the supplier for the channel to read from
85-
* @param dataBufferFactory the factory to create data buffers with
85+
* @param bufferFactory the factory to create data buffers with
8686
* @param bufferSize the maximum size of the data buffers
8787
* @return a flux of data buffers read from the given channel
8888
*/
8989
public static Flux<DataBuffer> readByteChannel(
90-
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
90+
Callable<ReadableByteChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) {
9191

9292
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
93-
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
93+
Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null");
9494
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
9595

9696
return Flux.using(channelSupplier,
97-
channel -> {
98-
ReadableByteChannelGenerator generator =
99-
new ReadableByteChannelGenerator(channel, dataBufferFactory,
100-
bufferSize);
101-
return Flux.generate(generator);
102-
},
103-
DataBufferUtils::closeChannel)
104-
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
97+
channel -> Flux.generate(new ReadableByteChannelGenerator(channel, bufferFactory, bufferSize)),
98+
DataBufferUtils::closeChannel);
99+
100+
// No doOnDiscard as operators used do not cache
105101
}
106102

107103
/**
108104
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
109105
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
110106
* @param channelSupplier the supplier for the channel to read from
111-
* @param dataBufferFactory the factory to create data buffers with
107+
* @param bufferFactory the factory to create data buffers with
112108
* @param bufferSize the maximum size of the data buffers
113109
* @return a flux of data buffers read from the given channel
114110
*/
115111
public static Flux<DataBuffer> readAsynchronousFileChannel(
116-
Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
112+
Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) {
117113

118-
return readAsynchronousFileChannel(channelSupplier, 0, dataBufferFactory, bufferSize);
114+
return readAsynchronousFileChannel(channelSupplier, 0, bufferFactory, bufferSize);
119115
}
120116

121117
/**
@@ -124,32 +120,30 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(
124120
* channel when the flux is terminated.
125121
* @param channelSupplier the supplier for the channel to read from
126122
* @param position the position to start reading from
127-
* @param dataBufferFactory the factory to create data buffers with
123+
* @param bufferFactory the factory to create data buffers with
128124
* @param bufferSize the maximum size of the data buffers
129125
* @return a flux of data buffers read from the given channel
130126
*/
131127
public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier,
132-
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
128+
long position, DataBufferFactory bufferFactory, int bufferSize) {
133129

134130
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
135-
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
131+
Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null");
136132
Assert.isTrue(position >= 0, "'position' must be >= 0");
137133
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
138134

139-
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
140-
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
141-
142-
Flux<DataBuffer> result = Flux.using(channelSupplier,
135+
Flux<DataBuffer> flux = Flux.using(channelSupplier,
143136
channel -> Flux.create(sink -> {
144-
AsynchronousFileChannelReadCompletionHandler completionHandler =
145-
new AsynchronousFileChannelReadCompletionHandler(channel,
146-
sink, position, dataBufferFactory, bufferSize);
147-
channel.read(byteBuffer, position, dataBuffer, completionHandler);
148-
sink.onDispose(completionHandler::dispose);
137+
ReadCompletionHandler handler =
138+
new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize);
139+
DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize);
140+
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
141+
channel.read(byteBuffer, position, dataBuffer, handler);
142+
sink.onDispose(handler::dispose);
149143
}),
150144
DataBufferUtils::closeChannel);
151145

152-
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
146+
return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
153147
}
154148

155149
/**
@@ -246,8 +240,7 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteC
246240

247241
Flux<DataBuffer> flux = Flux.from(source);
248242
return Flux.create(sink -> {
249-
WritableByteChannelSubscriber subscriber =
250-
new WritableByteChannelSubscriber(sink, channel);
243+
WritableByteChannelSubscriber subscriber = new WritableByteChannelSubscriber(sink, channel);
251244
sink.onDispose(subscriber);
252245
flux.subscribe(subscriber);
253246
});
@@ -292,10 +285,9 @@ public static Flux<DataBuffer> write(
292285

293286
Flux<DataBuffer> flux = Flux.from(source);
294287
return Flux.create(sink -> {
295-
AsynchronousFileChannelWriteCompletionHandler completionHandler =
296-
new AsynchronousFileChannelWriteCompletionHandler(sink, channel, position);
297-
sink.onDispose(completionHandler);
298-
flux.subscribe(completionHandler);
288+
WriteCompletionHandler handler = new WriteCompletionHandler(sink, channel, position);
289+
sink.onDispose(handler);
290+
flux.subscribe(handler);
299291
});
300292
}
301293

@@ -326,21 +318,21 @@ public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publishe
326318
Assert.notNull(publisher, "Publisher must not be null");
327319
Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
328320

329-
return Flux.defer(() -> {
330-
AtomicLong countDown = new AtomicLong(maxByteCount);
331-
return Flux.from(publisher)
332-
.map(buffer -> {
333-
long remainder = countDown.addAndGet(-buffer.readableByteCount());
334-
if (remainder < 0) {
335-
int length = buffer.readableByteCount() + (int) remainder;
336-
return buffer.slice(0, length);
337-
}
338-
else {
339-
return buffer;
340-
}
341-
})
342-
.takeUntil(buffer -> countDown.get() <= 0);
343-
}); // no doOnDiscard necessary, as this method does not drop buffers
321+
AtomicLong countDown = new AtomicLong(maxByteCount);
322+
return Flux.from(publisher)
323+
.map(buffer -> {
324+
long remainder = countDown.addAndGet(-buffer.readableByteCount());
325+
if (remainder < 0) {
326+
int length = buffer.readableByteCount() + (int) remainder;
327+
return buffer.slice(0, length);
328+
}
329+
else {
330+
return buffer;
331+
}
332+
})
333+
.takeUntil(buffer -> countDown.get() <= 0);
334+
335+
// No doOnDiscard as operators used do not cache (and drop) buffers
344336
}
345337

346338
/**
@@ -487,8 +479,7 @@ public void accept(SynchronousSink<DataBuffer> sink) {
487479
}
488480

489481

490-
private static class AsynchronousFileChannelReadCompletionHandler
491-
implements CompletionHandler<Integer, DataBuffer> {
482+
private static class ReadCompletionHandler implements CompletionHandler<Integer, DataBuffer> {
492483

493484
private final AsynchronousFileChannel channel;
494485

@@ -502,7 +493,7 @@ private static class AsynchronousFileChannelReadCompletionHandler
502493

503494
private final AtomicBoolean disposed = new AtomicBoolean();
504495

505-
public AsynchronousFileChannelReadCompletionHandler(AsynchronousFileChannel channel,
496+
public ReadCompletionHandler(AsynchronousFileChannel channel,
506497
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
507498

508499
this.channel = channel;
@@ -586,7 +577,7 @@ protected void hookOnComplete() {
586577
}
587578

588579

589-
private static class AsynchronousFileChannelWriteCompletionHandler extends BaseSubscriber<DataBuffer>
580+
private static class WriteCompletionHandler extends BaseSubscriber<DataBuffer>
590581
implements CompletionHandler<Integer, ByteBuffer> {
591582

592583
private final FluxSink<DataBuffer> sink;
@@ -601,7 +592,7 @@ private static class AsynchronousFileChannelWriteCompletionHandler extends BaseS
601592

602593
private final AtomicReference<DataBuffer> dataBuffer = new AtomicReference<>();
603594

604-
public AsynchronousFileChannelWriteCompletionHandler(
595+
public WriteCompletionHandler(
605596
FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
606597

607598
this.sink = sink;

0 commit comments

Comments
 (0)