11/*
2- * Copyright 2002-2025 the original author or authors.
2+ * Copyright 2002-2024 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
2121import java .util .LinkedHashSet ;
2222import java .util .List ;
2323import java .util .Set ;
24- import java .util .concurrent .atomic .AtomicReference ;
2524import java .util .function .Consumer ;
2625
2726import org .jspecify .annotations .Nullable ;
@@ -72,19 +71,20 @@ public class ResponseBodyEmitter {
7271
7372 private @ Nullable Handler handler ;
7473
75- private final AtomicReference <State > state = new AtomicReference <>(State .START );
76-
7774 /** Store send data before handler is initialized. */
7875 private final Set <DataWithMediaType > earlySendAttempts = new LinkedHashSet <>(8 );
7976
77+ /** Store successful completion before the handler is initialized. */
78+ private boolean complete ;
79+
8080 /** Store an error before the handler is initialized. */
8181 private @ Nullable Throwable failure ;
8282
83- private final TimeoutCallback timeoutCallback = new TimeoutCallback ();
83+ private final DefaultCallback timeoutCallback = new DefaultCallback ();
8484
8585 private final ErrorCallback errorCallback = new ErrorCallback ();
8686
87- private final CompletionCallback completionCallback = new CompletionCallback ();
87+ private final DefaultCallback completionCallback = new DefaultCallback ();
8888
8989
9090 /**
@@ -124,7 +124,7 @@ synchronized void initialize(Handler handler) throws IOException {
124124 this .earlySendAttempts .clear ();
125125 }
126126
127- if (this .state . get () == State . COMPLETE ) {
127+ if (this .complete ) {
128128 if (this .failure != null ) {
129129 this .handler .completeWithError (this .failure );
130130 }
@@ -139,12 +139,11 @@ synchronized void initialize(Handler handler) throws IOException {
139139 }
140140 }
141141
142- void initializeWithError (Throwable ex ) {
143- if (this .state .compareAndSet (State .START , State .COMPLETE )) {
144- this .failure = ex ;
145- this .earlySendAttempts .clear ();
146- this .errorCallback .accept (ex );
147- }
142+ synchronized void initializeWithError (Throwable ex ) {
143+ this .complete = true ;
144+ this .failure = ex ;
145+ this .earlySendAttempts .clear ();
146+ this .errorCallback .accept (ex );
148147 }
149148
150149 /**
@@ -182,7 +181,8 @@ public void send(Object object) throws IOException {
182181 * @throws java.lang.IllegalStateException wraps any other errors
183182 */
184183 public synchronized void send (Object object , @ Nullable MediaType mediaType ) throws IOException {
185- assertNotComplete ();
184+ Assert .state (!this .complete , () -> "ResponseBodyEmitter has already completed" +
185+ (this .failure != null ? " with error: " + this .failure : "" ));
186186 if (this .handler != null ) {
187187 try {
188188 this .handler .send (object , mediaType );
@@ -209,13 +209,9 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
209209 * @since 6.0.12
210210 */
211211 public synchronized void send (Set <DataWithMediaType > items ) throws IOException {
212- assertNotComplete ();
213- sendInternal (items );
214- }
215-
216- private void assertNotComplete () {
217- Assert .state (this .state .get () == State .START , () -> "ResponseBodyEmitter has already completed" +
212+ Assert .state (!this .complete , () -> "ResponseBodyEmitter has already completed" +
218213 (this .failure != null ? " with error: " + this .failure : "" ));
214+ sendInternal (items );
219215 }
220216
221217 private void sendInternal (Set <DataWithMediaType > items ) throws IOException {
@@ -246,8 +242,9 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
246242 * to complete request processing. It should not be used after container
247243 * related events such as an error while {@link #send(Object) sending}.
248244 */
249- public void complete () {
250- if (trySetComplete () && this .handler != null ) {
245+ public synchronized void complete () {
246+ this .complete = true ;
247+ if (this .handler != null ) {
251248 this .handler .complete ();
252249 }
253250 }
@@ -263,26 +260,20 @@ public void complete() {
263260 * container related events such as an error while
264261 * {@link #send(Object) sending}.
265262 */
266- public void completeWithError (Throwable ex ) {
267- if (trySetComplete ()) {
268- this .failure = ex ;
269- if (this .handler != null ) {
270- this .handler .completeWithError (ex );
271- }
263+ public synchronized void completeWithError (Throwable ex ) {
264+ this .complete = true ;
265+ this .failure = ex ;
266+ if (this .handler != null ) {
267+ this .handler .completeWithError (ex );
272268 }
273269 }
274270
275- private boolean trySetComplete () {
276- return (this .state .compareAndSet (State .START , State .COMPLETE ) ||
277- (this .state .compareAndSet (State .TIMEOUT , State .COMPLETE )));
278- }
279-
280271 /**
281272 * Register code to invoke when the async request times out. This method is
282273 * called from a container thread when an async request times out.
283274 * <p>As of 6.2, one can register multiple callbacks for this event.
284275 */
285- public void onTimeout (Runnable callback ) {
276+ public synchronized void onTimeout (Runnable callback ) {
286277 this .timeoutCallback .addDelegate (callback );
287278 }
288279
@@ -293,7 +284,7 @@ public void onTimeout(Runnable callback) {
293284 * <p>As of 6.2, one can register multiple callbacks for this event.
294285 * @since 5.0
295286 */
296- public void onError (Consumer <Throwable > callback ) {
287+ public synchronized void onError (Consumer <Throwable > callback ) {
297288 this .errorCallback .addDelegate (callback );
298289 }
299290
@@ -304,7 +295,7 @@ public void onError(Consumer<Throwable> callback) {
304295 * detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
305296 * <p>As of 6.2, one can register multiple callbacks for this event.
306297 */
307- public void onCompletion (Runnable callback ) {
298+ public synchronized void onCompletion (Runnable callback ) {
308299 this .completionCallback .addDelegate (callback );
309300 }
310301
@@ -371,80 +362,39 @@ public Object getData() {
371362 }
372363
373364
374- private class TimeoutCallback implements Runnable {
365+ private class DefaultCallback implements Runnable {
375366
376- private final List <Runnable > delegates = new ArrayList <>(1 );
367+ private List <Runnable > delegates = new ArrayList <>(1 );
377368
378- public synchronized void addDelegate (Runnable delegate ) {
369+ public void addDelegate (Runnable delegate ) {
379370 this .delegates .add (delegate );
380371 }
381372
382373 @ Override
383374 public void run () {
384- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .TIMEOUT )) {
385- for (Runnable delegate : this .delegates ) {
386- delegate .run ();
387- }
375+ ResponseBodyEmitter .this .complete = true ;
376+ for (Runnable delegate : this .delegates ) {
377+ delegate .run ();
388378 }
389379 }
390380 }
391381
392382
393383 private class ErrorCallback implements Consumer <Throwable > {
394384
395- private final List <Consumer <Throwable >> delegates = new ArrayList <>(1 );
385+ private List <Consumer <Throwable >> delegates = new ArrayList <>(1 );
396386
397- public synchronized void addDelegate (Consumer <Throwable > callback ) {
387+ public void addDelegate (Consumer <Throwable > callback ) {
398388 this .delegates .add (callback );
399389 }
400390
401391 @ Override
402392 public void accept (Throwable t ) {
403- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .COMPLETE )) {
404- for (Consumer <Throwable > delegate : this .delegates ) {
405- delegate .accept (t );
406- }
393+ ResponseBodyEmitter .this .complete = true ;
394+ for (Consumer <Throwable > delegate : this .delegates ) {
395+ delegate .accept (t );
407396 }
408397 }
409398 }
410399
411-
412- private class CompletionCallback implements Runnable {
413-
414- private final List <Runnable > delegates = new ArrayList <>(1 );
415-
416- public synchronized void addDelegate (Runnable delegate ) {
417- this .delegates .add (delegate );
418- }
419-
420- @ Override
421- public void run () {
422- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .COMPLETE )) {
423- for (Runnable delegate : this .delegates ) {
424- delegate .run ();
425- }
426- }
427- }
428- }
429-
430-
431- /**
432- * Represents a state for {@link ResponseBodyEmitter}.
433- * <p><pre>
434- * START ----+
435- * | |
436- * v |
437- * TIMEOUT |
438- * | |
439- * v |
440- * COMPLETE <--+
441- * </pre>
442- * @since 6.2.4
443- */
444- private enum State {
445- START ,
446- TIMEOUT , // handling a timeout
447- COMPLETE
448- }
449-
450400}
0 commit comments