3333import org .springframework .core .ReactiveAdapterRegistry ;
3434import org .springframework .core .io .buffer .DataBuffer ;
3535import org .springframework .core .io .buffer .DataBufferFactory ;
36+ import org .springframework .core .io .buffer .DataBufferUtils ;
3637import org .springframework .http .MediaType ;
3738import org .thymeleaf .IThrottledTemplateProcessor ;
3839import org .thymeleaf .TemplateEngine ;
@@ -181,7 +182,7 @@ private Mono<DataBuffer> createFullStream(
181182 final DataBufferFactory bufferFactory , final Charset charset ) {
182183
183184 final Mono <DataBuffer > stream =
184- Mono .create (
185+ Mono .< DataBuffer > create (
185186 subscriber -> {
186187
187188 if (logger .isTraceEnabled ()) {
@@ -198,6 +199,7 @@ private Mono<DataBuffer> createFullStream(
198199 process (templateName , markupSelectors , context , writer );
199200
200201 } catch (final Throwable t ) {
202+ DataBufferUtils .release (dataBuffer );
201203 logger .error (
202204 String .format (
203205 "[THYMELEAF][%s] Exception processing template \" %s\" : %s" ,
@@ -220,7 +222,8 @@ private Mono<DataBuffer> createFullStream(
220222 // This is a Mono<?>, so no need to call "next()" or "complete()"
221223 subscriber .success (dataBuffer );
222224
223- });
225+ })
226+ .doOnDiscard (DataBuffer .class , DataBufferUtils .releaseConsumer ());
224227
225228 // Will add some logging to the data stream
226229 return stream .log (LOG_CATEGORY_FULL_OUTPUT , Level .FINEST );
@@ -234,7 +237,7 @@ private Flux<DataBuffer> createChunkedStream(
234237 final String templateName , final Set <String > markupSelectors , final IContext context ,
235238 final DataBufferFactory bufferFactory , final Charset charset , final int responseMaxChunkSizeBytes ) {
236239
237- final Flux <DataBuffer > stream = Flux .generate (
240+ final Flux <DataBuffer > stream = Flux .< DataBuffer , StreamThrottledTemplateProcessor > generate (
238241
239242 // Using the throttledProcessor as state in this Flux.generate allows us to delay the
240243 // initialization of the throttled processor until the last moment, when output generation
@@ -265,9 +268,9 @@ private Flux<DataBuffer> createChunkedStream(
265268
266269 final int bytesProduced ;
267270 try {
268- bytesProduced =
269- throttledProcessor .process (responseMaxChunkSizeBytes , buffer .asOutputStream (), charset );
271+ bytesProduced = throttledProcessor .process (responseMaxChunkSizeBytes , buffer .asOutputStream (), charset );
270272 } catch (final Throwable t ) {
273+ DataBufferUtils .release (buffer );
271274 emitter .error (t );
272275 return null ;
273276 }
@@ -304,7 +307,8 @@ private Flux<DataBuffer> createChunkedStream(
304307
305308 return throttledProcessor ;
306309
307- });
310+ })
311+ .doOnDiscard (DataBuffer .class , DataBufferUtils .releaseConsumer ());
308312
309313 // Will add some logging to the data stream
310314 return stream .log (LOG_CATEGORY_CHUNKED_OUTPUT , Level .FINEST );
@@ -419,7 +423,7 @@ private Flux<DataBuffer> createDataDrivenStream(
419423 // STEP 5: React to each buffer of published data by creating one or many (concatMap) DataBuffers containing
420424 // the result of processing only that buffer.
421425 final Flux <DataBuffer > stream = dataDrivenWithContextStream .concatMap (
422- (step ) -> Flux .generate (
426+ (step ) -> Flux .< DataBuffer , Boolean > generate (
423427
424428 // We set initialize to TRUE as a state, so that the first step executed for this Flux
425429 // performs the initialization of the dataDrivenIterator for the entire Flux. It is a need
@@ -490,11 +494,9 @@ private Flux<DataBuffer> createDataDrivenStream(
490494
491495 final int bytesProduced ;
492496 try {
493-
494- bytesProduced =
495- throttledProcessor .process (responseMaxChunkSizeBytes , buffer .asOutputStream (), charset );
496-
497+ bytesProduced = throttledProcessor .process (responseMaxChunkSizeBytes , buffer .asOutputStream (), charset );
497498 } catch (final Throwable t ) {
499+ DataBufferUtils .release (buffer );
498500 emitter .error (t );
499501 return Boolean .FALSE ;
500502 }
@@ -582,7 +584,9 @@ private Flux<DataBuffer> createDataDrivenStream(
582584
583585 return Boolean .FALSE ;
584586
585- }));
587+ })
588+ .doOnDiscard (DataBuffer .class , DataBufferUtils .releaseConsumer ())
589+ );
586590
587591
588592 // Will add some logging to the data flow
0 commit comments