Skip to content

Commit 8714648

Browse files
committed
Limits on input stream in codecs
- Add maxInMemorySize property to Decoder and HttpMessageReader implementations that aggregate input to trigger DataBufferLimitException when reached. - For codecs that call DataBufferUtils#join, there is now an overloaded variant with a maxInMemorySize extra argument. Internally, a custom LimitedDataBufferList is used to count and enforce the limit. - Jackson2Tokenizer and XmlEventDecoder support those limits per streamed JSON object. - Configurable limits for multipart requests with Synchronoss NIO. - Centralized maxInMemorySize exposed via CodecConfigurer along with ability to plug in an instance of MultipartHttpMessageWrite. Closes gh-23884
1 parent cf1b762 commit 8714648

25 files changed

+1169
-191
lines changed

spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,40 @@
4747
*/
4848
public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {
4949

50+
private int maxInMemorySize = -1;
51+
5052

5153
protected AbstractDataBufferDecoder(MimeType... supportedMimeTypes) {
5254
super(supportedMimeTypes);
5355
}
5456

5557

58+
/**
59+
* Configure a limit on the number of bytes that can be buffered whenever
60+
* the input stream needs to be aggregated. This can be a result of
61+
* decoding to a single {@code DataBuffer},
62+
* {@link java.nio.ByteBuffer ByteBuffer}, {@code byte[]},
63+
* {@link org.springframework.core.io.Resource Resource}, {@code String}, etc.
64+
* It can also occur when splitting the input stream, e.g. delimited text,
65+
* in which case the limit applies to data buffered between delimiters.
66+
* <p>By default in 5.1 this is set to -1, unlimited. In 5.2 the default
67+
* value for this limit is set to 256K.
68+
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
69+
* @since 5.1.11
70+
*/
71+
public void setMaxInMemorySize(int byteCount) {
72+
this.maxInMemorySize = byteCount;
73+
}
74+
75+
/**
76+
* Return the {@link #setMaxInMemorySize configured} byte count limit.
77+
* @since 5.1.11
78+
*/
79+
public int getMaxInMemorySize() {
80+
return this.maxInMemorySize;
81+
}
82+
83+
5684
@Override
5785
public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
5886
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
@@ -64,7 +92,7 @@ public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
6492
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
6593
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
6694

67-
return DataBufferUtils.join(input)
95+
return DataBufferUtils.join(input, this.maxInMemorySize)
6896
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
6997
}
7098

spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,17 @@
2525
import java.util.Map;
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.ConcurrentMap;
28+
import java.util.function.Consumer;
2829

2930
import org.reactivestreams.Publisher;
3031
import reactor.core.publisher.Flux;
3132

3233
import org.springframework.core.ResolvableType;
3334
import org.springframework.core.io.buffer.DataBuffer;
35+
import org.springframework.core.io.buffer.DataBufferLimitException;
3436
import org.springframework.core.io.buffer.DataBufferUtils;
3537
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
38+
import org.springframework.core.io.buffer.LimitedDataBufferList;
3639
import org.springframework.core.io.buffer.PooledDataBuffer;
3740
import org.springframework.core.log.LogFormatUtils;
3841
import org.springframework.lang.Nullable;
@@ -92,10 +95,16 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy
9295

9396
List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
9497

98+
// TODO: Drop Consumer and use bufferUntil with Supplier<LimistedDataBufferList> (reactor-core#1925)
99+
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
100+
LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize());
101+
95102
Flux<DataBuffer> inputFlux = Flux.from(input)
96103
.flatMapIterable(buffer -> splitOnDelimiter(buffer, delimiterBytes))
104+
.doOnNext(limiter)
97105
.bufferUntil(buffer -> buffer == END_FRAME)
98106
.map(StringDecoder::joinUntilEndFrame)
107+
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
99108
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
100109

101110
return super.decode(inputFlux, elementType, mimeType, hints);
@@ -283,4 +292,35 @@ public static StringDecoder allMimeTypes(List<String> delimiters, boolean stripD
283292
new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL);
284293
}
285294

295+
296+
/**
297+
* Temporary measure for reactor-core#1925.
298+
* Consumer that adds to a {@link LimitedDataBufferList} to enforce limits.
299+
*/
300+
private static class LimitedDataBufferConsumer implements Consumer<DataBuffer> {
301+
302+
private final LimitedDataBufferList bufferList;
303+
304+
305+
public LimitedDataBufferConsumer(int maxInMemorySize) {
306+
this.bufferList = new LimitedDataBufferList(maxInMemorySize);
307+
}
308+
309+
310+
@Override
311+
public void accept(DataBuffer buffer) {
312+
if (buffer == END_FRAME) {
313+
this.bufferList.clear();
314+
}
315+
else {
316+
try {
317+
this.bufferList.add(buffer);
318+
}
319+
catch (DataBufferLimitException ex) {
320+
DataBufferUtils.release(buffer);
321+
throw ex;
322+
}
323+
}
324+
}
325+
}
286326
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.core.io.buffer;
17+
18+
/**
19+
* Exception that indicates the cumulative number of bytes consumed from a
20+
* stream of {@link DataBuffer DataBuffer}'s exceeded some pre-configured limit.
21+
* This can be raised when data buffers are cached and aggregated, e.g.
22+
* {@link DataBufferUtils#join}. Or it could also be raised when data buffers
23+
* have been released but a parsed representation is being aggregated, e.g. async
24+
* parsing with Jackson.
25+
*
26+
* @author Rossen Stoyanchev
27+
* @since 5.1.11
28+
*/
29+
@SuppressWarnings("serial")
30+
public class DataBufferLimitException extends IllegalStateException {
31+
32+
33+
public DataBufferLimitException(String message) {
34+
super(message);
35+
}
36+
37+
}

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -437,14 +437,36 @@ public static Consumer<DataBuffer> releaseConsumer() {
437437
* @since 5.0.3
438438
*/
439439
public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
440-
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
440+
return join(dataBuffers, -1);
441+
}
442+
443+
/**
444+
* Variant of {@link #join(Publisher)} that behaves the same way up until
445+
* the specified max number of bytes to buffer. Once the limit is exceeded,
446+
* {@link DataBufferLimitException} is raised.
447+
* @param buffers the data buffers that are to be composed
448+
* @param maxByteCount the max number of bytes to buffer, or -1 for unlimited
449+
* @return a buffer with the aggregated content, possibly an empty Mono if
450+
* the max number of bytes to buffer is exceeded.
451+
* @throws DataBufferLimitException if maxByteCount is exceeded
452+
* @since 5.1.11
453+
*/
454+
@SuppressWarnings("unchecked")
455+
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> buffers, int maxByteCount) {
456+
Assert.notNull(buffers, "'dataBuffers' must not be null");
441457

442-
return Flux.from(dataBuffers)
443-
.collectList()
458+
if (buffers instanceof Mono) {
459+
return (Mono<DataBuffer>) buffers;
460+
}
461+
462+
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
463+
464+
return Flux.from(buffers)
465+
.collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add)
444466
.filter(list -> !list.isEmpty())
445467
.map(list -> list.get(0).factory().join(list))
468+
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
446469
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
447-
448470
}
449471

450472

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.core.io.buffer;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collection;
20+
import java.util.List;
21+
import java.util.function.Predicate;
22+
23+
import reactor.core.publisher.Flux;
24+
25+
/**
26+
* Custom {@link List} to collect data buffers with and enforce a
27+
* limit on the total number of bytes buffered. For use with "collect" or
28+
* other buffering operators in declarative APIs, e.g. {@link Flux}.
29+
*
30+
* <p>Adding elements increases the byte count and if the limit is exceeded,
31+
* {@link DataBufferLimitException} is raised. {@link #clear()} resets the
32+
* count. Remove and set are not supported.
33+
*
34+
* <p><strong>Note:</strong> This class does not automatically release the
35+
* buffers it contains. It is usually preferable to use hooks such as
36+
* {@link Flux#doOnDiscard} that also take care of cancel and error signals,
37+
* or otherwise {@link #releaseAndClear()} can be used.
38+
*
39+
* @author Rossen Stoyanchev
40+
* @since 5.1.11
41+
*/
42+
@SuppressWarnings("serial")
43+
public class LimitedDataBufferList extends ArrayList<DataBuffer> {
44+
45+
private final int maxByteCount;
46+
47+
private int byteCount;
48+
49+
50+
public LimitedDataBufferList(int maxByteCount) {
51+
this.maxByteCount = maxByteCount;
52+
}
53+
54+
55+
@Override
56+
public boolean add(DataBuffer buffer) {
57+
boolean result = super.add(buffer);
58+
if (result) {
59+
updateCount(buffer.readableByteCount());
60+
}
61+
return result;
62+
}
63+
64+
@Override
65+
public void add(int index, DataBuffer buffer) {
66+
super.add(index, buffer);
67+
updateCount(buffer.readableByteCount());
68+
}
69+
70+
@Override
71+
public boolean addAll(Collection<? extends DataBuffer> collection) {
72+
boolean result = super.addAll(collection);
73+
collection.forEach(buffer -> updateCount(buffer.readableByteCount()));
74+
return result;
75+
}
76+
77+
@Override
78+
public boolean addAll(int index, Collection<? extends DataBuffer> collection) {
79+
boolean result = super.addAll(index, collection);
80+
collection.forEach(buffer -> updateCount(buffer.readableByteCount()));
81+
return result;
82+
}
83+
84+
private void updateCount(int bytesToAdd) {
85+
if (this.maxByteCount < 0) {
86+
return;
87+
}
88+
if (bytesToAdd > Integer.MAX_VALUE - this.byteCount) {
89+
raiseLimitException();
90+
}
91+
else {
92+
this.byteCount += bytesToAdd;
93+
if (this.byteCount > this.maxByteCount) {
94+
raiseLimitException();
95+
}
96+
}
97+
}
98+
99+
private void raiseLimitException() {
100+
// Do not release here, it's likely down via doOnDiscard..
101+
throw new DataBufferLimitException(
102+
"Exceeded limit on max bytes to buffer : " + this.maxByteCount);
103+
}
104+
105+
@Override
106+
public DataBuffer remove(int index) {
107+
throw new UnsupportedOperationException();
108+
}
109+
110+
@Override
111+
public boolean remove(Object o) {
112+
throw new UnsupportedOperationException();
113+
}
114+
115+
@Override
116+
protected void removeRange(int fromIndex, int toIndex) {
117+
throw new UnsupportedOperationException();
118+
}
119+
120+
@Override
121+
public boolean removeAll(Collection<?> c) {
122+
throw new UnsupportedOperationException();
123+
}
124+
125+
@Override
126+
public boolean removeIf(Predicate<? super DataBuffer> filter) {
127+
throw new UnsupportedOperationException();
128+
}
129+
130+
@Override
131+
public DataBuffer set(int index, DataBuffer element) {
132+
throw new UnsupportedOperationException();
133+
}
134+
135+
@Override
136+
public void clear() {
137+
this.byteCount = 0;
138+
super.clear();
139+
}
140+
141+
/**
142+
* Shortcut to {@link DataBufferUtils#release release} all data buffers and
143+
* then {@link #clear()}.
144+
*/
145+
public void releaseAndClear() {
146+
forEach(buf -> {
147+
try {
148+
DataBufferUtils.release(buf);
149+
}
150+
catch (Throwable ex) {
151+
// Keep going..
152+
}
153+
});
154+
clear();
155+
}
156+
157+
}

spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.springframework.core.ResolvableType;
3131
import org.springframework.core.io.buffer.DataBuffer;
32+
import org.springframework.core.io.buffer.DataBufferLimitException;
3233
import org.springframework.util.MimeType;
3334
import org.springframework.util.MimeTypeUtils;
3435

@@ -126,6 +127,20 @@ public void decodeNewLine() {
126127
.verify());
127128
}
128129

130+
@Test
131+
public void decodeNewLineWithLimit() {
132+
Flux<DataBuffer> input = Flux.just(
133+
stringBuffer("abc\n"),
134+
stringBuffer("defg\n"),
135+
stringBuffer("hijkl\n")
136+
);
137+
this.decoder.setMaxInMemorySize(4);
138+
139+
testDecode(input, String.class, step ->
140+
step.expectNext("abc", "defg")
141+
.verifyError(DataBufferLimitException.class));
142+
}
143+
129144
@Test
130145
public void decodeNewLineIncludeDelimiters() {
131146
this.decoder = StringDecoder.allMimeTypes(StringDecoder.DEFAULT_DELIMITERS, false);

0 commit comments

Comments
 (0)