21
21
import java .nio .charset .StandardCharsets ;
22
22
import java .util .ArrayList ;
23
23
import java .util .Arrays ;
24
+ import java .util .Collection ;
25
+ import java .util .Collections ;
24
26
import java .util .List ;
25
27
import java .util .Map ;
26
28
import java .util .concurrent .ConcurrentHashMap ;
27
29
import java .util .concurrent .ConcurrentMap ;
28
- import java .util .function .Consumer ;
29
30
30
31
import org .reactivestreams .Publisher ;
31
32
import reactor .core .publisher .Flux ;
33
+ import reactor .core .publisher .Mono ;
32
34
33
35
import org .springframework .core .ResolvableType ;
34
36
import org .springframework .core .io .buffer .DataBuffer ;
35
- import org .springframework .core .io .buffer .DataBufferLimitException ;
36
37
import org .springframework .core .io .buffer .DataBufferUtils ;
37
- import org .springframework .core .io .buffer .DataBufferWrapper ;
38
- import org .springframework .core .io .buffer .DefaultDataBufferFactory ;
39
38
import org .springframework .core .io .buffer .LimitedDataBufferList ;
40
39
import org .springframework .core .io .buffer .PooledDataBuffer ;
41
40
import org .springframework .core .log .LogFormatUtils ;
45
44
import org .springframework .util .MimeTypeUtils ;
46
45
47
46
/**
48
- * Decode from a data buffer stream to a {@code String} stream. Before decoding, this decoder
49
- * realigns the incoming data buffers so that each buffer ends with a newline.
50
- * This is to make sure that multibyte characters are decoded properly, and do not cross buffer
51
- * boundaries. The default delimiters ({@code \n}, {@code \r\n}) can be customized.
52
- *
53
- * <p>Partially inspired by Netty's {@code DelimiterBasedFrameDecoder} .
47
+ * Decode from a data buffer stream to a {@code String} stream, either splitting
48
+ * or aggregating incoming data chunks to realign along newlines delimiters
49
+ * and produce a stream of strings. This is useful for streaming but is also
50
+ * necessary to ensure that that multibyte characters can be decoded correctly,
51
+ * avoiding split-character issues. The default delimiters used by default are
52
+ * {@code \n} and {@code \r\n} but that can be customized .
54
53
*
55
54
* @author Sebastien Deleuze
56
55
* @author Brian Clozel
@@ -115,21 +114,22 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy
115
114
116
115
byte [][] delimiterBytes = getDelimiterBytes (mimeType );
117
116
118
- Flux <DataBuffer > inputFlux = Flux .defer (() -> {
119
- DataBufferUtils .Matcher matcher = DataBufferUtils .matcher (delimiterBytes );
120
-
121
- @ SuppressWarnings ("MismatchedQueryAndUpdateOfCollection" )
122
- LimitChecker limiter = new LimitChecker (getMaxInMemorySize ());
123
-
124
- return Flux .from (input )
125
- .concatMapIterable (buffer -> endFrameAfterDelimiter (buffer , matcher ))
126
- .doOnNext (limiter )
127
- .bufferUntil (buffer -> buffer instanceof EndFrameBuffer )
128
- .map (list -> joinAndStrip (list , this .stripDelimiter ))
129
- .doOnDiscard (PooledDataBuffer .class , DataBufferUtils ::release );
130
- });
131
-
132
- return super .decode (inputFlux , elementType , mimeType , hints );
117
+ LimitedDataBufferList chunks = new LimitedDataBufferList (getMaxInMemorySize ());
118
+ DataBufferUtils .Matcher matcher = DataBufferUtils .matcher (delimiterBytes );
119
+
120
+ return Flux .from (input )
121
+ .concatMapIterable (buffer -> processDataBuffer (buffer , matcher , chunks ))
122
+ .concatWith (Mono .defer (() -> {
123
+ if (chunks .isEmpty ()) {
124
+ return Mono .empty ();
125
+ }
126
+ DataBuffer lastBuffer = chunks .get (0 ).factory ().join (chunks );
127
+ chunks .clear ();
128
+ return Mono .just (lastBuffer );
129
+ }))
130
+ .doOnTerminate (chunks ::releaseAndClear )
131
+ .doOnDiscard (PooledDataBuffer .class , PooledDataBuffer ::release )
132
+ .map (buffer -> decode (buffer , elementType , mimeType , hints ));
133
133
}
134
134
135
135
private byte [][] getDelimiterBytes (@ Nullable MimeType mimeType ) {
@@ -142,6 +142,43 @@ private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) {
142
142
});
143
143
}
144
144
145
+ private Collection <DataBuffer > processDataBuffer (
146
+ DataBuffer buffer , DataBufferUtils .Matcher matcher , LimitedDataBufferList chunks ) {
147
+
148
+ try {
149
+ List <DataBuffer > result = null ;
150
+ do {
151
+ int endIndex = matcher .match (buffer );
152
+ if (endIndex == -1 ) {
153
+ chunks .add (buffer );
154
+ DataBufferUtils .retain (buffer ); // retain after add (may raise DataBufferLimitException)
155
+ break ;
156
+ }
157
+ int startIndex = buffer .readPosition ();
158
+ int length = (endIndex - startIndex + 1 );
159
+ DataBuffer slice = buffer .retainedSlice (startIndex , length );
160
+ if (this .stripDelimiter ) {
161
+ slice .writePosition (slice .writePosition () - matcher .delimiter ().length );
162
+ }
163
+ result = (result != null ? result : new ArrayList <>());
164
+ if (chunks .isEmpty ()) {
165
+ result .add (slice );
166
+ }
167
+ else {
168
+ chunks .add (slice );
169
+ result .add (buffer .factory ().join (chunks ));
170
+ chunks .clear ();
171
+ }
172
+ buffer .readPosition (endIndex + 1 );
173
+ }
174
+ while (buffer .readableByteCount () > 0 );
175
+ return (result != null ? result : Collections .emptyList ());
176
+ }
177
+ finally {
178
+ DataBufferUtils .release (buffer );
179
+ }
180
+ }
181
+
145
182
@ Override
146
183
public String decode (DataBuffer dataBuffer , ResolvableType elementType ,
147
184
@ Nullable MimeType mimeType , @ Nullable Map <String , Object > hints ) {
@@ -166,68 +203,6 @@ private Charset getCharset(@Nullable MimeType mimeType) {
166
203
}
167
204
}
168
205
169
- /**
170
- * Finds the first match and longest delimiter, {@link EndFrameBuffer} just after it.
171
- * @param dataBuffer the buffer to find delimiters in
172
- * @param matcher used to find the first delimiters
173
- * @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was
174
- * found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable)
175
- * results in memory leaks due to pre-fetching.
176
- */
177
- private static List <DataBuffer > endFrameAfterDelimiter (DataBuffer dataBuffer , DataBufferUtils .Matcher matcher ) {
178
- List <DataBuffer > result = new ArrayList <>();
179
- try {
180
- do {
181
- int endIdx = matcher .match (dataBuffer );
182
- if (endIdx != -1 ) {
183
- int readPosition = dataBuffer .readPosition ();
184
- int length = (endIdx - readPosition + 1 );
185
- DataBuffer slice = dataBuffer .retainedSlice (readPosition , length );
186
- result .add (slice );
187
- result .add (new EndFrameBuffer (matcher .delimiter ()));
188
- dataBuffer .readPosition (endIdx + 1 );
189
- }
190
- else {
191
- result .add (DataBufferUtils .retain (dataBuffer ));
192
- break ;
193
- }
194
- }
195
- while (dataBuffer .readableByteCount () > 0 );
196
- }
197
- finally {
198
- DataBufferUtils .release (dataBuffer );
199
- }
200
- return result ;
201
- }
202
-
203
- /**
204
- * Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is
205
- * removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with
206
- * a delimiter, it is removed.
207
- * @param dataBuffers the data buffers to join
208
- * @param stripDelimiter whether to strip the delimiter
209
- * @return the joined buffer
210
- */
211
- private static DataBuffer joinAndStrip (List <DataBuffer > dataBuffers , boolean stripDelimiter ) {
212
- Assert .state (!dataBuffers .isEmpty (), "DataBuffers should not be empty" );
213
-
214
- byte [] matchingDelimiter = null ;
215
-
216
- int lastIdx = dataBuffers .size () - 1 ;
217
- DataBuffer lastBuffer = dataBuffers .get (lastIdx );
218
- if (lastBuffer instanceof EndFrameBuffer ) {
219
- matchingDelimiter = ((EndFrameBuffer ) lastBuffer ).delimiter ();
220
- dataBuffers .remove (lastIdx );
221
- }
222
-
223
- DataBuffer result = dataBuffers .get (0 ).factory ().join (dataBuffers );
224
- if (stripDelimiter && matchingDelimiter != null ) {
225
- result .writePosition (result .writePosition () - matchingDelimiter .length );
226
- }
227
- return result ;
228
- }
229
-
230
-
231
206
/**
232
207
* Create a {@code StringDecoder} for {@code "text/plain"}.
233
208
* @param stripDelimiter this flag is ignored
@@ -285,46 +260,4 @@ public static StringDecoder allMimeTypes(List<String> delimiters, boolean stripD
285
260
new MimeType ("text" , "plain" , DEFAULT_CHARSET ), MimeTypeUtils .ALL );
286
261
}
287
262
288
-
289
- private static class EndFrameBuffer extends DataBufferWrapper {
290
-
291
- private static final DataBuffer BUFFER = new DefaultDataBufferFactory ().wrap (new byte [0 ]);
292
-
293
- private final byte [] delimiter ;
294
-
295
- public EndFrameBuffer (byte [] delimiter ) {
296
- super (BUFFER );
297
- this .delimiter = delimiter ;
298
- }
299
-
300
- public byte [] delimiter () {
301
- return this .delimiter ;
302
- }
303
- }
304
-
305
-
306
- private static class LimitChecker implements Consumer <DataBuffer > {
307
-
308
- @ SuppressWarnings ("MismatchedQueryAndUpdateOfCollection" )
309
- private final LimitedDataBufferList list ;
310
-
311
- LimitChecker (int maxInMemorySize ) {
312
- this .list = new LimitedDataBufferList (maxInMemorySize );
313
- }
314
-
315
- @ Override
316
- public void accept (DataBuffer buffer ) {
317
- if (buffer instanceof EndFrameBuffer ) {
318
- this .list .clear ();
319
- }
320
- try {
321
- this .list .add (buffer );
322
- }
323
- catch (DataBufferLimitException ex ) {
324
- DataBufferUtils .release (buffer );
325
- throw ex ;
326
- }
327
- }
328
- }
329
-
330
263
}
0 commit comments