@@ -163,7 +163,6 @@ public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType
163
163
(mediaType == null || MediaType .MULTIPART_FORM_DATA .isCompatibleWith (mediaType ));
164
164
}
165
165
166
-
167
166
@ Override
168
167
public Flux <Part > read (ResolvableType elementType , ReactiveHttpInputMessage message , Map <String , Object > hints ) {
169
168
return Flux .create (new SynchronossPartGenerator (message ))
@@ -177,13 +176,9 @@ public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage mess
177
176
});
178
177
}
179
178
180
-
181
179
@ Override
182
- public Mono <Part > readMono (
183
- ResolvableType elementType , ReactiveHttpInputMessage message , Map <String , Object > hints ) {
184
-
185
- return Mono .error (new UnsupportedOperationException (
186
- "Cannot read multipart request body into single Part" ));
180
+ public Mono <Part > readMono (ResolvableType elementType , ReactiveHttpInputMessage message , Map <String , Object > hints ) {
181
+ return Mono .error (new UnsupportedOperationException ("Cannot read multipart request body into single Part" ));
187
182
}
188
183
189
184
@@ -197,16 +192,16 @@ private class SynchronossPartGenerator extends BaseSubscriber<DataBuffer> implem
197
192
198
193
private final LimitedPartBodyStreamStorageFactory storageFactory = new LimitedPartBodyStreamStorageFactory ();
199
194
195
+ @ Nullable
200
196
private NioMultipartParserListener listener ;
201
197
198
+ @ Nullable
202
199
private NioMultipartParser parser ;
203
200
204
-
205
201
public SynchronossPartGenerator (ReactiveHttpInputMessage inputMessage ) {
206
202
this .inputMessage = inputMessage ;
207
203
}
208
204
209
-
210
205
@ Override
211
206
public void accept (FluxSink <Part > sink ) {
212
207
HttpHeaders headers = this .inputMessage .getHeaders ();
@@ -229,10 +224,13 @@ public void accept(FluxSink<Part> sink) {
229
224
230
225
@ Override
231
226
protected void hookOnNext (DataBuffer buffer ) {
227
+ Assert .state (this .parser != null && this .listener != null , "Not initialized yet" );
228
+
232
229
int size = buffer .readableByteCount ();
233
230
this .storageFactory .increaseByteCount (size );
234
231
byte [] resultBytes = new byte [size ];
235
232
buffer .read (resultBytes );
233
+
236
234
try {
237
235
this .parser .write (resultBytes );
238
236
}
@@ -249,24 +247,32 @@ protected void hookOnNext(DataBuffer buffer) {
249
247
@ Override
250
248
protected void hookOnError (Throwable ex ) {
251
249
try {
252
- this .parser .close ();
250
+ if (this .parser != null ) {
251
+ this .parser .close ();
252
+ }
253
253
}
254
254
catch (IOException ex2 ) {
255
255
// ignore
256
256
}
257
257
finally {
258
- int index = this .storageFactory .getCurrentPartIndex ();
259
- this .listener .onError ("Failure while parsing part[" + index + "]" , ex );
258
+ if (this .listener != null ) {
259
+ int index = this .storageFactory .getCurrentPartIndex ();
260
+ this .listener .onError ("Failure while parsing part[" + index + "]" , ex );
261
+ }
260
262
}
261
263
}
262
264
263
265
@ Override
264
266
protected void hookFinally (SignalType type ) {
265
267
try {
266
- this .parser .close ();
268
+ if (this .parser != null ) {
269
+ this .parser .close ();
270
+ }
267
271
}
268
272
catch (IOException ex ) {
269
- this .listener .onError ("Error while closing parser" , ex );
273
+ if (this .listener != null ) {
274
+ this .listener .onError ("Error while closing parser" , ex );
275
+ }
270
276
}
271
277
}
272
278
@@ -280,17 +286,16 @@ private int getContentLength(HttpHeaders headers) {
280
286
281
287
private class LimitedPartBodyStreamStorageFactory implements PartBodyStreamStorageFactory {
282
288
283
- private final PartBodyStreamStorageFactory storageFactory = maxInMemorySize > 0 ?
289
+ private final PartBodyStreamStorageFactory storageFactory = ( maxInMemorySize > 0 ?
284
290
new DefaultPartBodyStreamStorageFactory (maxInMemorySize ) :
285
- new DefaultPartBodyStreamStorageFactory ();
291
+ new DefaultPartBodyStreamStorageFactory ()) ;
286
292
287
293
private int index = 1 ;
288
294
289
295
private boolean isFilePart ;
290
296
291
297
private long partSize ;
292
298
293
-
294
299
public int getCurrentPartIndex () {
295
300
return this .index ;
296
301
}
@@ -339,7 +344,6 @@ private static class FluxSinkAdapterListener implements NioMultipartParserListen
339
344
340
345
private final AtomicInteger terminated = new AtomicInteger (0 );
341
346
342
-
343
347
FluxSinkAdapterListener (
344
348
FluxSink <Part > sink , MultipartContext context , LimitedPartBodyStreamStorageFactory factory ) {
345
349
@@ -348,7 +352,6 @@ private static class FluxSinkAdapterListener implements NioMultipartParserListen
348
352
this .storageFactory = factory ;
349
353
}
350
354
351
-
352
355
@ Override
353
356
public void onPartFinished (StreamStorage storage , Map <String , List <String >> headers ) {
354
357
HttpHeaders httpHeaders = new HttpHeaders ();
0 commit comments