52
52
import org .springframework .http .ReactiveHttpInputMessage ;
53
53
import org .springframework .http .codec .HttpMessageReader ;
54
54
import org .springframework .util .Assert ;
55
- import org .springframework .util .MimeType ;
56
- import org .springframework .util .StreamUtils ;
57
55
58
56
/**
59
57
* {@code HttpMessageReader} for parsing {@code "multipart/form-data"} requests
71
69
*/
72
70
public class SynchronossPartHttpMessageReader implements HttpMessageReader <Part > {
73
71
72
+ private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory ();
73
+
74
74
75
75
@ Override
76
76
public List <MediaType > getReadableMediaTypes () {
@@ -88,7 +88,7 @@ public boolean canRead(ResolvableType elementType, MediaType mediaType) {
88
88
public Flux <Part > read (ResolvableType elementType , ReactiveHttpInputMessage message ,
89
89
Map <String , Object > hints ) {
90
90
91
- return Flux .create (new SynchronossPartGenerator (message ));
91
+ return Flux .create (new SynchronossPartGenerator (message , this . bufferFactory ));
92
92
}
93
93
94
94
@@ -109,17 +109,20 @@ private static class SynchronossPartGenerator implements Consumer<FluxSink<Part>
109
109
110
110
private final ReactiveHttpInputMessage inputMessage ;
111
111
112
+ private final DataBufferFactory bufferFactory ;
113
+
112
114
113
- SynchronossPartGenerator (ReactiveHttpInputMessage inputMessage ) {
115
+ SynchronossPartGenerator (ReactiveHttpInputMessage inputMessage , DataBufferFactory factory ) {
114
116
this .inputMessage = inputMessage ;
117
+ this .bufferFactory = factory ;
115
118
}
116
119
117
120
118
121
@ Override
119
122
public void accept (FluxSink <Part > emitter ) {
120
123
121
124
MultipartContext context = createMultipartContext ();
122
- NioMultipartParserListener listener = new FluxSinkAdapterListener (emitter );
125
+ NioMultipartParserListener listener = new FluxSinkAdapterListener (emitter , this . bufferFactory );
123
126
NioMultipartParser parser = Multipart .multipart (context ).forNIO (listener );
124
127
125
128
this .inputMessage .getBody ().subscribe (buffer -> {
@@ -167,26 +170,32 @@ private static class FluxSinkAdapterListener implements NioMultipartParserListen
167
170
168
171
private final FluxSink <Part > sink ;
169
172
173
+ private final DataBufferFactory bufferFactory ;
174
+
170
175
private final AtomicInteger terminated = new AtomicInteger (0 );
171
176
172
177
173
- FluxSinkAdapterListener (FluxSink <Part > sink ) {
178
+ FluxSinkAdapterListener (FluxSink <Part > sink , DataBufferFactory bufferFactory ) {
174
179
this .sink = sink ;
180
+ this .bufferFactory = bufferFactory ;
175
181
}
176
182
177
183
178
184
@ Override
179
185
public void onPartFinished (StreamStorage storage , Map <String , List <String >> headers ) {
180
186
HttpHeaders httpHeaders = new HttpHeaders ();
181
187
httpHeaders .putAll (headers );
182
- this .sink .next (new SynchronossPart (httpHeaders , storage ));
188
+ Part part = MultipartUtils .getFileName (httpHeaders ) != null ?
189
+ new SynchronossFilePart (httpHeaders , storage , this .bufferFactory ) :
190
+ new DefaultSynchronossPart (httpHeaders , storage , this .bufferFactory );
191
+ this .sink .next (part );
183
192
}
184
193
185
194
@ Override
186
195
public void onFormFieldPartFinished (String name , String value , Map <String , List <String >> headers ) {
187
196
HttpHeaders httpHeaders = new HttpHeaders ();
188
197
httpHeaders .putAll (headers );
189
- this .sink .next (new SynchronossPart (httpHeaders , value ));
198
+ this .sink .next (new SynchronossFormFieldPart (httpHeaders , this . bufferFactory , value ));
190
199
}
191
200
192
201
@ Override
@@ -213,31 +222,18 @@ public void onNestedPartFinished() {
213
222
}
214
223
215
224
216
- private static class SynchronossPart implements Part {
225
+ private static abstract class AbstractSynchronossPart implements Part {
217
226
218
227
private final HttpHeaders headers ;
219
228
220
- private final StreamStorage storage ;
221
-
222
- private final String content ;
223
-
224
- private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory ();
225
-
229
+ private final DataBufferFactory bufferFactory ;
226
230
227
- SynchronossPart (HttpHeaders headers , StreamStorage storage ) {
228
- Assert .notNull (headers , "HttpHeaders is required" );
229
- Assert .notNull (storage , "'storage' is required" );
230
- this .headers = headers ;
231
- this .storage = storage ;
232
- this .content = null ;
233
- }
234
231
235
- SynchronossPart (HttpHeaders headers , String content ) {
232
+ AbstractSynchronossPart (HttpHeaders headers , DataBufferFactory bufferFactory ) {
236
233
Assert .notNull (headers , "HttpHeaders is required" );
237
- Assert .notNull (content , "'content ' is required" );
234
+ Assert .notNull (bufferFactory , "'bufferFactory ' is required" );
238
235
this .headers = headers ;
239
- this .storage = null ;
240
- this .content = content ;
236
+ this .bufferFactory = bufferFactory ;
241
237
}
242
238
243
239
@@ -251,52 +247,53 @@ public HttpHeaders getHeaders() {
251
247
return this .headers ;
252
248
}
253
249
254
- @ Override
255
- public Optional <String > getFilename () {
256
- return Optional .ofNullable (MultipartUtils .getFileName (this .headers ));
250
+ protected DataBufferFactory getBufferFactory () {
251
+ return this .bufferFactory ;
257
252
}
253
+ }
258
254
259
- @ Override
260
- public Mono <String > getContentAsString () {
261
- if (this .content != null ) {
262
- return Mono .just (this .content );
263
- }
264
- try {
265
- InputStream inputStream = this .storage .getInputStream ();
266
- Charset charset = getCharset ();
267
- return Mono .just (StreamUtils .copyToString (inputStream , charset ));
268
- }
269
- catch (IOException e ) {
270
- return Mono .error (new IllegalStateException (
271
- "Error while reading part content as a string" , e ));
272
- }
273
- }
255
+ private static class DefaultSynchronossPart extends AbstractSynchronossPart {
274
256
275
- private Charset getCharset () {
276
- return Optional .ofNullable (this .headers .getContentType ())
277
- .map (MimeType ::getCharset ).orElse (StandardCharsets .UTF_8 );
257
+ private final StreamStorage storage ;
258
+
259
+
260
+ DefaultSynchronossPart (HttpHeaders headers , StreamStorage storage , DataBufferFactory factory ) {
261
+ super (headers , factory );
262
+ Assert .notNull (storage , "'storage' is required" );
263
+ this .storage = storage ;
278
264
}
279
265
266
+
280
267
@ Override
281
268
public Flux <DataBuffer > getContent () {
282
- if (this .content != null ) {
283
- DataBuffer buffer = this .bufferFactory .allocateBuffer (this .content .length ());
284
- buffer .write (this .content .getBytes ());
285
- return Flux .just (buffer );
286
- }
287
269
InputStream inputStream = this .storage .getInputStream ();
288
- return DataBufferUtils .read (inputStream , this .bufferFactory , 4096 );
270
+ return DataBufferUtils .read (inputStream , getBufferFactory (), 4096 );
271
+ }
272
+
273
+ protected StreamStorage getStorage () {
274
+ return this .storage ;
275
+ }
276
+ }
277
+
278
+ private static class SynchronossFilePart extends DefaultSynchronossPart implements FilePart {
279
+
280
+
281
+ public SynchronossFilePart (HttpHeaders headers , StreamStorage storage , DataBufferFactory factory ) {
282
+ super (headers , storage , factory );
283
+ }
284
+
285
+
286
+ @ Override
287
+ public String getFilename () {
288
+ return MultipartUtils .getFileName (getHeaders ());
289
289
}
290
290
291
291
@ Override
292
292
public Mono <Void > transferTo (File destination ) {
293
- if (this .storage == null || !getFilename ().isPresent ()) {
294
- return Mono .error (new IllegalStateException ("The part does not represent a file." ));
295
- }
296
293
ReadableByteChannel input = null ;
297
294
FileChannel output = null ;
298
295
try {
299
- input = Channels .newChannel (this . storage .getInputStream ());
296
+ input = Channels .newChannel (getStorage () .getInputStream ());
300
297
output = new FileOutputStream (destination ).getChannel ();
301
298
302
299
long size = (input instanceof FileChannel ? ((FileChannel ) input ).size () : Long .MAX_VALUE );
@@ -332,4 +329,34 @@ public Mono<Void> transferTo(File destination) {
332
329
}
333
330
}
334
331
332
+ private static class SynchronossFormFieldPart extends AbstractSynchronossPart implements FormFieldPart {
333
+
334
+ private final String content ;
335
+
336
+
337
+ SynchronossFormFieldPart (HttpHeaders headers , DataBufferFactory bufferFactory , String content ) {
338
+ super (headers , bufferFactory );
339
+ this .content = content ;
340
+ }
341
+
342
+
343
+ @ Override
344
+ public String getValue () {
345
+ return this .content ;
346
+ }
347
+
348
+ @ Override
349
+ public Flux <DataBuffer > getContent () {
350
+ byte [] bytes = this .content .getBytes (getCharset ());
351
+ DataBuffer buffer = getBufferFactory ().allocateBuffer (bytes .length );
352
+ buffer .write (bytes );
353
+ return Flux .just (buffer );
354
+ }
355
+
356
+ private Charset getCharset () {
357
+ return Optional .ofNullable (MultipartUtils .getCharEncoding (getHeaders ()))
358
+ .map (Charset ::forName ).orElse (StandardCharsets .UTF_8 );
359
+ }
360
+ }
361
+
335
362
}
0 commit comments