30
30
import java .util .Queue ;
31
31
import java .util .concurrent .ConcurrentLinkedQueue ;
32
32
import java .util .concurrent .atomic .AtomicBoolean ;
33
- import java .util .concurrent .atomic .AtomicInteger ;
34
33
import java .util .concurrent .atomic .AtomicLong ;
35
34
import java .util .concurrent .atomic .AtomicReference ;
36
35
41
40
import reactor .core .publisher .Flux ;
42
41
import reactor .core .publisher .FluxSink ;
43
42
import reactor .core .publisher .Mono ;
43
+ import reactor .core .publisher .MonoSink ;
44
44
import reactor .core .scheduler .Scheduler ;
45
45
import reactor .util .context .Context ;
46
46
47
- import org .springframework .core .codec .DecodingException ;
48
47
import org .springframework .core .io .buffer .DataBuffer ;
49
48
import org .springframework .core .io .buffer .DataBufferLimitException ;
50
49
import org .springframework .core .io .buffer .DataBufferUtils ;
@@ -65,13 +64,9 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> {
65
64
66
65
private final AtomicReference <State > state = new AtomicReference <>(new InitialState ());
67
66
68
- private final AtomicInteger partCount = new AtomicInteger ();
69
-
70
67
private final AtomicBoolean requestOutstanding = new AtomicBoolean ();
71
68
72
- private final FluxSink <Part > sink ;
73
-
74
- private final int maxParts ;
69
+ private final MonoSink <Part > sink ;
75
70
76
71
private final boolean streaming ;
77
72
@@ -84,11 +79,10 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> {
84
79
private final Scheduler blockingOperationScheduler ;
85
80
86
81
87
- private PartGenerator (FluxSink <Part > sink , int maxParts , int maxInMemorySize , long maxDiskUsagePerPart ,
82
+ private PartGenerator (MonoSink <Part > sink , int maxInMemorySize , long maxDiskUsagePerPart ,
88
83
boolean streaming , Mono <Path > fileStorageDirectory , Scheduler blockingOperationScheduler ) {
89
84
90
85
this .sink = sink ;
91
- this .maxParts = maxParts ;
92
86
this .maxInMemorySize = maxInMemorySize ;
93
87
this .maxDiskUsagePerPart = maxDiskUsagePerPart ;
94
88
this .streaming = streaming ;
@@ -99,15 +93,15 @@ private PartGenerator(FluxSink<Part> sink, int maxParts, int maxInMemorySize, lo
99
93
/**
100
94
* Creates parts from a given stream of tokens.
101
95
*/
102
- public static Flux <Part > createParts (Flux <MultipartParser .Token > tokens , int maxParts , int maxInMemorySize ,
96
+ public static Mono <Part > createPart (Flux <MultipartParser .Token > tokens , int maxInMemorySize ,
103
97
long maxDiskUsagePerPart , boolean streaming , Mono <Path > fileStorageDirectory ,
104
98
Scheduler blockingOperationScheduler ) {
105
99
106
- return Flux .create (sink -> {
107
- PartGenerator generator = new PartGenerator (sink , maxParts , maxInMemorySize , maxDiskUsagePerPart , streaming ,
100
+ return Mono .create (sink -> {
101
+ PartGenerator generator = new PartGenerator (sink , maxInMemorySize , maxDiskUsagePerPart , streaming ,
108
102
fileStorageDirectory , blockingOperationScheduler );
109
103
110
- sink .onCancel (generator :: onSinkCancel );
104
+ sink .onCancel (generator );
111
105
sink .onRequest (l -> generator .requestToken ());
112
106
tokens .subscribe (generator );
113
107
});
@@ -128,13 +122,6 @@ protected void hookOnNext(MultipartParser.Token token) {
128
122
this .requestOutstanding .set (false );
129
123
State state = this .state .get ();
130
124
if (token instanceof MultipartParser .HeadersToken ) {
131
- // finish previous part
132
- state .partComplete (false );
133
-
134
- if (tooManyParts ()) {
135
- return ;
136
- }
137
-
138
125
newPart (state , token .headers ());
139
126
}
140
127
else {
@@ -144,11 +131,11 @@ protected void hookOnNext(MultipartParser.Token token) {
144
131
145
132
private void newPart (State currentState , HttpHeaders headers ) {
146
133
if (MultipartUtils .isFormField (headers )) {
147
- changeStateInternal ( new FormFieldState (headers ));
134
+ changeState ( currentState , new FormFieldState (headers ));
148
135
requestToken ();
149
136
}
150
137
else if (!this .streaming ) {
151
- changeStateInternal ( new InMemoryState (headers ));
138
+ changeState ( currentState , new InMemoryState (headers ));
152
139
requestToken ();
153
140
}
154
141
else {
@@ -165,7 +152,7 @@ else if (!this.streaming) {
165
152
166
153
@ Override
167
154
protected void hookOnComplete () {
168
- this .state .get ().partComplete ( true );
155
+ this .state .get ().onComplete ( );
169
156
}
170
157
171
158
@ Override
@@ -175,7 +162,8 @@ protected void hookOnError(Throwable throwable) {
175
162
this .sink .error (throwable );
176
163
}
177
164
178
- private void onSinkCancel () {
165
+ @ Override
166
+ public void dispose () {
179
167
changeStateInternal (DisposedState .INSTANCE );
180
168
cancel ();
181
169
}
@@ -211,39 +199,21 @@ void emitPart(Part part) {
211
199
if (logger .isTraceEnabled ()) {
212
200
logger .trace ("Emitting: " + part );
213
201
}
214
- this .sink .next (part );
202
+ this .sink .success (part );
215
203
}
216
204
217
- void emitComplete () {
218
- this .sink .complete ();
219
- }
220
-
221
-
222
205
void emitError (Throwable t ) {
223
206
cancel ();
224
207
this .sink .error (t );
225
208
}
226
209
227
210
void requestToken () {
228
211
if (upstream () != null &&
229
- !this .sink .isCancelled () &&
230
- this .sink .requestedFromDownstream () > 0 &&
231
212
this .requestOutstanding .compareAndSet (false , true )) {
232
213
request (1 );
233
214
}
234
215
}
235
216
236
- private boolean tooManyParts () {
237
- int count = this .partCount .incrementAndGet ();
238
- if (this .maxParts > 0 && count > this .maxParts ) {
239
- emitError (new DecodingException ("Too many parts (" + count + "/" + this .maxParts + " allowed)" ));
240
- return true ;
241
- }
242
- else {
243
- return false ;
244
- }
245
- }
246
-
247
217
/**
248
218
* Represents the internal state of the {@link PartGenerator} for
249
219
* creating a single {@link Part}.
@@ -273,10 +243,8 @@ private interface State {
273
243
274
244
/**
275
245
* Invoked when all tokens for the part have been received.
276
- * @param finalPart {@code true} if this was the last part (and
277
- * {@link #emitComplete()} should be called; {@code false} otherwise
278
246
*/
279
- void partComplete ( boolean finalPart );
247
+ void onComplete ( );
280
248
281
249
/**
282
250
* Invoked when an error has been received.
@@ -307,10 +275,7 @@ public void body(DataBuffer dataBuffer) {
307
275
}
308
276
309
277
@ Override
310
- public void partComplete (boolean finalPart ) {
311
- if (finalPart ) {
312
- emitComplete ();
313
- }
278
+ public void onComplete () {
314
279
}
315
280
316
281
@ Override
@@ -364,13 +329,10 @@ private void store(DataBuffer dataBuffer) {
364
329
}
365
330
366
331
@ Override
367
- public void partComplete ( boolean finalPart ) {
332
+ public void onComplete ( ) {
368
333
byte [] bytes = this .value .toByteArrayUnsafe ();
369
334
String value = new String (bytes , MultipartUtils .charset (this .headers ));
370
335
emitPart (DefaultParts .formFieldPart (this .headers , value ));
371
- if (finalPart ) {
372
- emitComplete ();
373
- }
374
336
}
375
337
376
338
@ Override
@@ -410,13 +372,10 @@ public void body(DataBuffer dataBuffer) {
410
372
}
411
373
412
374
@ Override
413
- public void partComplete ( boolean finalPart ) {
375
+ public void onComplete ( ) {
414
376
if (!this .bodySink .isCancelled ()) {
415
377
this .bodySink .complete ();
416
378
}
417
- if (finalPart ) {
418
- emitComplete ();
419
- }
420
379
}
421
380
422
381
@ Override
@@ -493,11 +452,8 @@ private void switchToFile(DataBuffer current, long byteCount) {
493
452
}
494
453
495
454
@ Override
496
- public void partComplete ( boolean finalPart ) {
455
+ public void onComplete ( ) {
497
456
emitMemoryPart ();
498
- if (finalPart ) {
499
- emitComplete ();
500
- }
501
457
}
502
458
503
459
private void emitMemoryPart () {
@@ -545,8 +501,6 @@ private final class CreateFileState implements State {
545
501
546
502
private volatile boolean completed ;
547
503
548
- private volatile boolean finalPart ;
549
-
550
504
private volatile boolean releaseOnDispose = true ;
551
505
552
506
@@ -563,9 +517,8 @@ public void body(DataBuffer dataBuffer) {
563
517
}
564
518
565
519
@ Override
566
- public void partComplete ( boolean finalPart ) {
520
+ public void onComplete ( ) {
567
521
this .completed = true ;
568
- this .finalPart = finalPart ;
569
522
}
570
523
571
524
public void createFile () {
@@ -597,7 +550,7 @@ private void fileCreated(WritingFileState newState) {
597
550
newState .writeBuffers (this .content );
598
551
599
552
if (this .completed ) {
600
- newState .partComplete ( this . finalPart );
553
+ newState .onComplete ( );
601
554
}
602
555
}
603
556
else {
@@ -665,12 +618,9 @@ public void body(DataBuffer dataBuffer) {
665
618
}
666
619
667
620
@ Override
668
- public void partComplete ( boolean finalPart ) {
621
+ public void onComplete ( ) {
669
622
MultipartUtils .closeChannel (this .channel );
670
623
emitPart (DefaultParts .part (this .headers , this .file , PartGenerator .this .blockingOperationScheduler ));
671
- if (finalPart ) {
672
- emitComplete ();
673
- }
674
624
}
675
625
676
626
@ Override
@@ -701,8 +651,6 @@ private final class WritingFileState implements State {
701
651
702
652
private volatile boolean completed ;
703
653
704
- private volatile boolean finalPart ;
705
-
706
654
707
655
public WritingFileState (CreateFileState state , Path file , WritableByteChannel channel ) {
708
656
this .headers = state .headers ;
@@ -725,9 +673,8 @@ public void body(DataBuffer dataBuffer) {
725
673
}
726
674
727
675
@ Override
728
- public void partComplete ( boolean finalPart ) {
676
+ public void onComplete ( ) {
729
677
this .completed = true ;
730
- this .finalPart = finalPart ;
731
678
}
732
679
733
680
public void writeBuffer (DataBuffer dataBuffer ) {
@@ -752,7 +699,7 @@ public void writeBuffers(Iterable<DataBuffer> dataBuffers) {
752
699
private void writeComplete () {
753
700
IdleFileState newState = new IdleFileState (this );
754
701
if (this .completed ) {
755
- newState .partComplete ( this . finalPart );
702
+ newState .onComplete ( );
756
703
}
757
704
else if (changeState (this , newState )) {
758
705
requestToken ();
@@ -799,7 +746,7 @@ public void body(DataBuffer dataBuffer) {
799
746
}
800
747
801
748
@ Override
802
- public void partComplete ( boolean finalPart ) {
749
+ public void onComplete ( ) {
803
750
}
804
751
805
752
@ Override
0 commit comments