Skip to content

Commit 973ee9b

Browse files
committed
(Re)introduce DefaultMultipartMessageReader
This commit introduces the DefaultMultipartMessageReader, a fully reactive multipart parser without third party dependencies. An earlier version of this code was introduced in fb642ce, but removed again in 77c24aa because of buffering issues. Closes gh-21659
1 parent 57f868f commit 973ee9b

File tree

18 files changed

+2433
-33
lines changed

18 files changed

+2433
-33
lines changed
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Copyright 2002-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.codec.multipart;
18+
19+
import java.io.IOException;
20+
import java.nio.charset.StandardCharsets;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.Paths;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
import reactor.core.publisher.Flux;
29+
import reactor.core.publisher.Mono;
30+
import reactor.core.scheduler.Scheduler;
31+
import reactor.core.scheduler.Schedulers;
32+
33+
import org.springframework.core.ResolvableType;
34+
import org.springframework.core.codec.DecodingException;
35+
import org.springframework.core.io.buffer.DataBufferLimitException;
36+
import org.springframework.http.HttpMessage;
37+
import org.springframework.http.MediaType;
38+
import org.springframework.http.ReactiveHttpInputMessage;
39+
import org.springframework.http.codec.HttpMessageReader;
40+
import org.springframework.http.codec.LoggingCodecSupport;
41+
import org.springframework.lang.Nullable;
42+
import org.springframework.util.Assert;
43+
44+
/**
45+
* Default {@code HttpMessageReader} for parsing {@code "multipart/form-data"}
46+
* requests to a stream of {@link Part}s.
47+
*
48+
* <p>In default, non-streaming mode, this message reader stores the
49+
* {@linkplain Part#content() contents} of parts smaller than
50+
* {@link #setMaxInMemorySize(int) maxInMemorySize} in memory, and parts larger
51+
* than that to a temporary file in
52+
* {@link #setFileStorageDirectory(Path) fileStorageDirectory}.
53+
* <p>In {@linkplain #setStreaming(boolean) streaming} mode, the contents of the
54+
* part is streamed directly from the parsed input buffer stream, and not stored
55+
* in memory nor file.
56+
*
57+
* <p>This reader can be provided to {@link MultipartHttpMessageReader} in order
58+
* to aggregate all parts into a Map.
59+
*
60+
* @author Arjen Poutsma
61+
* @since 5.3
62+
*/
63+
public class DefaultPartHttpMessageReader extends LoggingCodecSupport implements HttpMessageReader<Part> {
64+
65+
private static final String IDENTIFIER = "spring-multipart";
66+
67+
private int maxInMemorySize = 256 * 1024;
68+
69+
private int maxHeadersSize = 8 * 1024;
70+
71+
private long maxDiskUsagePerPart = -1;
72+
73+
private int maxParts = -1;
74+
75+
private boolean streaming;
76+
77+
private Scheduler blockingOperationScheduler = Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
78+
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, IDENTIFIER, 60, true);
79+
80+
private Mono<Path> fileStorageDirectory = Mono.defer(this::defaultFileStorageDirectory).cache();
81+
82+
83+
/**
84+
* Configure the maximum amount of memory that is allowed per headers section of each part.
85+
* When the limit
86+
* @param byteCount the maximum amount of memory for headers
87+
*/
88+
public void setMaxHeadersSize(int byteCount) {
89+
this.maxHeadersSize = byteCount;
90+
}
91+
92+
/**
93+
* Get the {@link #setMaxInMemorySize configured} maximum in-memory size.
94+
*/
95+
public int getMaxInMemorySize() {
96+
return this.maxInMemorySize;
97+
}
98+
99+
/**
100+
* Configure the maximum amount of memory allowed per part.
101+
* When the limit is exceeded:
102+
* <ul>
103+
* <li>file parts are written to a temporary file.
104+
* <li>non-file parts are rejected with {@link DataBufferLimitException}.
105+
* </ul>
106+
* <p>By default this is set to 256K.
107+
* <p>Note that this property is ignored when
108+
* {@linkplain #setStreaming(boolean) streaming} is enabled.
109+
* @param maxInMemorySize the in-memory limit in bytes; if set to -1 the entire
110+
* contents will be stored in memory
111+
*/
112+
public void setMaxInMemorySize(int maxInMemorySize) {
113+
this.maxInMemorySize = maxInMemorySize;
114+
}
115+
116+
/**
117+
* Configure the maximum amount of disk space allowed for file parts.
118+
* <p>By default this is set to -1, meaning that there is no maximum.
119+
* <p>Note that this property is ignored when
120+
* {@linkplain #setStreaming(boolean) streaming} is enabled, , or when
121+
* {@link #setMaxInMemorySize(int) maxInMemorySize} is set to -1.
122+
*/
123+
public void setMaxDiskUsagePerPart(long maxDiskUsagePerPart) {
124+
this.maxDiskUsagePerPart = maxDiskUsagePerPart;
125+
}
126+
127+
/**
128+
* Specify the maximum number of parts allowed in a given multipart request.
129+
* <p>By default this is set to -1, meaning that there is no maximum.
130+
*/
131+
public void setMaxParts(int maxParts) {
132+
this.maxParts = maxParts;
133+
}
134+
135+
/**
136+
* Sets the directory used to store parts larger than
137+
* {@link #setMaxInMemorySize(int) maxInMemorySize}. By default, a directory
138+
* named {@code spring-webflux-multipart} is created under the system
139+
* temporary directory.
140+
* <p>Note that this property is ignored when
141+
* {@linkplain #setStreaming(boolean) streaming} is enabled, or when
142+
* {@link #setMaxInMemorySize(int) maxInMemorySize} is set to -1.
143+
* @throws IOException if an I/O error occurs, or the parent directory
144+
* does not exist
145+
*/
146+
public void setFileStorageDirectory(Path fileStorageDirectory) throws IOException {
147+
Assert.notNull(fileStorageDirectory, "FileStorageDirectory must not be null");
148+
if (!Files.exists(fileStorageDirectory)) {
149+
Files.createDirectory(fileStorageDirectory);
150+
}
151+
this.fileStorageDirectory = Mono.just(fileStorageDirectory);
152+
}
153+
154+
/**
155+
* Sets the Reactor {@link Scheduler} to be used for creating files and
156+
* directories, and writing to files. By default, a bounded scheduler is
157+
* created with default properties.
158+
* <p>Note that this property is ignored when
159+
* {@linkplain #setStreaming(boolean) streaming} is enabled, or when
160+
* {@link #setMaxInMemorySize(int) maxInMemorySize} is set to -1.
161+
* @see Schedulers#newBoundedElastic
162+
*/
163+
public void setBlockingOperationScheduler(Scheduler blockingOperationScheduler) {
164+
Assert.notNull(blockingOperationScheduler, "FileCreationScheduler must not be null");
165+
this.blockingOperationScheduler = blockingOperationScheduler;
166+
}
167+
168+
/**
169+
* When set to {@code true}, the {@linkplain Part#content() part content}
170+
* is streamed directly from the parsed input buffer stream, and not stored
171+
* in memory nor file.
172+
* When {@code false}, parts are backed by
173+
* in-memory and/or file storage. Defaults to {@code false}.
174+
*
175+
* <p><strong>NOTE</strong> that with streaming enabled, the
176+
* {@code Flux<Part>} that is produced by this message reader must be
177+
* consumed in the original order, i.e. the order of the HTTP message.
178+
* Additionally, the {@linkplain Part#content() body contents} must either
179+
* be completely consumed or canceled before moving to the next part.
180+
*
181+
* <p>Also note that enabling this property effectively ignores
182+
* {@link #setMaxInMemorySize(int) maxInMemorySize},
183+
* {@link #setMaxDiskUsagePerPart(long) maxDiskUsagePerPart},
184+
* {@link #setFileStorageDirectory(Path) fileStorageDirectory}, and
185+
* {@link #setBlockingOperationScheduler(Scheduler) fileCreationScheduler}.
186+
*/
187+
public void setStreaming(boolean streaming) {
188+
this.streaming = streaming;
189+
}
190+
191+
@Override
192+
public List<MediaType> getReadableMediaTypes() {
193+
return Collections.singletonList(MediaType.MULTIPART_FORM_DATA);
194+
}
195+
196+
@Override
197+
public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) {
198+
return Part.class.equals(elementType.toClass()) &&
199+
(mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType));
200+
}
201+
202+
@Override
203+
public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage message,
204+
Map<String, Object> hints) {
205+
return Mono.error(new UnsupportedOperationException("Cannot read multipart request body into single Part"));
206+
}
207+
208+
@Override
209+
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
210+
return Flux.defer(() -> {
211+
byte[] boundary = boundary(message);
212+
if (boundary == null) {
213+
return Flux.error(new DecodingException("No multipart boundary found in Content-Type: \"" +
214+
message.getHeaders().getContentType() + "\""));
215+
}
216+
Flux<MultipartParser.Token> tokens = MultipartParser.parse(message.getBody(), boundary,
217+
this.maxHeadersSize);
218+
219+
return PartGenerator.createParts(tokens, this.maxParts, this.maxInMemorySize, this.maxDiskUsagePerPart,
220+
this.streaming, this.fileStorageDirectory, this.blockingOperationScheduler);
221+
});
222+
}
223+
224+
@Nullable
225+
private static byte[] boundary(HttpMessage message) {
226+
MediaType contentType = message.getHeaders().getContentType();
227+
if (contentType != null) {
228+
String boundary = contentType.getParameter("boundary");
229+
if (boundary != null) {
230+
return boundary.getBytes(StandardCharsets.ISO_8859_1);
231+
}
232+
}
233+
return null;
234+
}
235+
236+
@SuppressWarnings("BlockingMethodInNonBlockingContext")
237+
private Mono<Path> defaultFileStorageDirectory() {
238+
return Mono.fromCallable(() -> {
239+
Path tempDirectory = Paths.get(System.getProperty("java.io.tmpdir"), IDENTIFIER);
240+
if (!Files.exists(tempDirectory)) {
241+
Files.createDirectory(tempDirectory);
242+
}
243+
return tempDirectory;
244+
}).subscribeOn(this.blockingOperationScheduler);
245+
246+
}
247+
248+
}

0 commit comments

Comments
 (0)