diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index 8c6336e85e6a..f7a3aa218d33 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -21,6 +21,8 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.jspecify.annotations.Nullable; @@ -63,6 +65,7 @@ * @author Rossen Stoyanchev * @author Juergen Hoeller * @author Brian Clozel + * @author Taeik Lim * @since 4.2 */ public class ResponseBodyEmitter { @@ -86,6 +89,8 @@ public class ResponseBodyEmitter { private final DefaultCallback completionCallback = new DefaultCallback(); + /** Guards access to write operations on the response. */ + protected final Lock writeLock = new ReentrantLock(); /** * Create a new ResponseBodyEmitter instance. @@ -114,36 +119,48 @@ public ResponseBodyEmitter(Long timeout) { } - synchronized void initialize(Handler handler) throws IOException { - this.handler = handler; - + void initialize(Handler handler) throws IOException { + this.writeLock.lock(); try { - sendInternal(this.earlySendAttempts); - } - finally { - this.earlySendAttempts.clear(); - } + this.handler = handler; + + try { + sendInternal(this.earlySendAttempts); + } + finally { + this.earlySendAttempts.clear(); + } - if (this.complete) { - if (this.failure != null) { - this.handler.completeWithError(this.failure); + if (this.complete) { + if (this.failure != null) { + this.handler.completeWithError(this.failure); + } + else { + this.handler.complete(); + } } else { - this.handler.complete(); + this.handler.onTimeout(this.timeoutCallback); + this.handler.onError(this.errorCallback); + this.handler.onCompletion(this.completionCallback); } } - else { - this.handler.onTimeout(this.timeoutCallback); - this.handler.onError(this.errorCallback); - this.handler.onCompletion(this.completionCallback); + finally { + this.writeLock.unlock(); } } - synchronized void initializeWithError(Throwable ex) { - this.complete = true; - this.failure = ex; - this.earlySendAttempts.clear(); - this.errorCallback.accept(ex); + void initializeWithError(Throwable ex) { + this.writeLock.lock(); + try { + this.complete = true; + this.failure = ex; + this.earlySendAttempts.clear(); + this.errorCallback.accept(ex); + } + finally { + this.writeLock.unlock(); + } } /** @@ -180,22 +197,28 @@ public void send(Object object) throws IOException { * @throws IOException raised when an I/O error occurs * @throws java.lang.IllegalStateException wraps any other errors */ - public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException { + public void send(Object object, @Nullable MediaType mediaType) throws IOException { Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" + (this.failure != null ? " with error: " + this.failure : "")); - if (this.handler != null) { - try { - this.handler.send(object, mediaType); - } - catch (IOException ex) { - throw ex; + this.writeLock.lock(); + try { + if (this.handler != null) { + try { + this.handler.send(object, mediaType); + } + catch (IOException ex) { + throw ex; + } + catch (Throwable ex) { + throw new IllegalStateException("Failed to send " + object, ex); + } } - catch (Throwable ex) { - throw new IllegalStateException("Failed to send " + object, ex); + else { + this.earlySendAttempts.add(new DataWithMediaType(object, mediaType)); } } - else { - this.earlySendAttempts.add(new DataWithMediaType(object, mediaType)); + finally { + this.writeLock.unlock(); } } @@ -208,10 +231,16 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro * @throws java.lang.IllegalStateException wraps any other errors * @since 6.0.12 */ - public synchronized void send(Set items) throws IOException { + public void send(Set items) throws IOException { Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" + (this.failure != null ? " with error: " + this.failure : "")); - sendInternal(items); + this.writeLock.lock(); + try { + sendInternal(items); + } + finally { + this.writeLock.unlock(); + } } private void sendInternal(Set items) throws IOException { @@ -242,10 +271,16 @@ private void sendInternal(Set items) throws IOException { * to complete request processing. It should not be used after container * related events such as an error while {@link #send(Object) sending}. */ - public synchronized void complete() { - this.complete = true; - if (this.handler != null) { - this.handler.complete(); + public void complete() { + this.writeLock.lock(); + try { + this.complete = true; + if (this.handler != null) { + this.handler.complete(); + } + } + finally { + this.writeLock.unlock(); } } @@ -260,11 +295,17 @@ public synchronized void complete() { * container related events such as an error while * {@link #send(Object) sending}. */ - public synchronized void completeWithError(Throwable ex) { - this.complete = true; - this.failure = ex; - if (this.handler != null) { - this.handler.completeWithError(ex); + public void completeWithError(Throwable ex) { + this.writeLock.lock(); + try { + this.complete = true; + this.failure = ex; + if (this.handler != null) { + this.handler.completeWithError(ex); + } + } + finally { + this.writeLock.unlock(); } } @@ -273,8 +314,14 @@ public synchronized void completeWithError(Throwable ex) { * called from a container thread when an async request times out. *

As of 6.2, one can register multiple callbacks for this event. */ - public synchronized void onTimeout(Runnable callback) { - this.timeoutCallback.addDelegate(callback); + public void onTimeout(Runnable callback) { + this.writeLock.lock(); + try { + this.timeoutCallback.addDelegate(callback); + } + finally { + this.writeLock.unlock(); + } } /** @@ -284,8 +331,14 @@ public synchronized void onTimeout(Runnable callback) { *

As of 6.2, one can register multiple callbacks for this event. * @since 5.0 */ - public synchronized void onError(Consumer callback) { - this.errorCallback.addDelegate(callback); + public void onError(Consumer callback) { + this.writeLock.lock(); + try { + this.errorCallback.addDelegate(callback); + } + finally { + this.writeLock.unlock(); + } } /** @@ -295,8 +348,14 @@ public synchronized void onError(Consumer callback) { * detecting that a {@code ResponseBodyEmitter} instance is no longer usable. *

As of 6.2, one can register multiple callbacks for this event. */ - public synchronized void onCompletion(Runnable callback) { - this.completionCallback.addDelegate(callback); + public void onCompletion(Runnable callback) { + this.writeLock.lock(); + try { + this.completionCallback.addDelegate(callback); + } + finally { + this.writeLock.unlock(); + } } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java index 2e3fc8c03173..ca8ff6463205 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java @@ -21,8 +21,6 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.jspecify.annotations.Nullable; @@ -41,16 +39,13 @@ * @author Juergen Hoeller * @author Sam Brannen * @author Brian Clozel + * @author Taeik Lim * @since 4.2 */ public class SseEmitter extends ResponseBodyEmitter { private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8); - /** Guards access to write operations on the response. */ - private final Lock writeLock = new ReentrantLock(); - - /** * Create a new SseEmitter instance. */