Skip to content

Commit 08e9372

Browse files
committed
Restore response after beforeCommit action errors
See gh-24186
1 parent 34d3240 commit 08e9372

File tree

2 files changed

+73
-30
lines changed

2 files changed

+73
-30
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -176,22 +176,25 @@ public boolean isCommitted() {
176176
@Override
177177
@SuppressWarnings("unchecked")
178178
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
179-
// Write as Mono if possible as an optimization hint to Reactor Netty
180-
// ChannelSendOperator not necessary for Mono
179+
// For Mono we can avoid ChannelSendOperator and Reactor Netty is more optimized for Mono.
180+
// We must resolve value first however, for a chance to handle potential error.
181181
if (body instanceof Mono) {
182-
return ((Mono<? extends DataBuffer>) body).flatMap(buffer ->
183-
doCommit(() -> writeWithInternal(Mono.just(buffer)))
184-
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release))
185-
.doOnError(t -> this.getHeaders().clearContentHeaders());
182+
return ((Mono<? extends DataBuffer>) body)
183+
.flatMap(buffer -> doCommit(() ->
184+
writeWithInternal(Mono.fromCallable(() -> buffer)
185+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release))))
186+
.doOnError(t -> getHeaders().clearContentHeaders());
187+
}
188+
else {
189+
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
190+
.doOnError(t -> getHeaders().clearContentHeaders());
186191
}
187-
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
188-
.doOnError(t -> this.getHeaders().clearContentHeaders());
189192
}
190193

191194
@Override
192195
public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
193196
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeAndFlushWithInternal(inner)))
194-
.doOnError(t -> this.getHeaders().clearContentHeaders());
197+
.doOnError(t -> getHeaders().clearContentHeaders());
195198
}
196199

197200
@Override
@@ -217,21 +220,30 @@ protected Mono<Void> doCommit(@Nullable Supplier<? extends Mono<Void>> writeActi
217220
if (!this.state.compareAndSet(State.NEW, State.COMMITTING)) {
218221
return Mono.empty();
219222
}
220-
this.commitActions.add(() ->
221-
Mono.fromRunnable(() -> {
222-
applyStatusCode();
223-
applyHeaders();
224-
applyCookies();
225-
this.state.set(State.COMMITTED);
226-
}));
227-
if (writeAction != null) {
228-
this.commitActions.add(writeAction);
223+
224+
Flux<Void> allActions = Flux.empty();
225+
226+
if (!this.commitActions.isEmpty()) {
227+
allActions = Flux.concat(Flux.fromIterable(this.commitActions).map(Supplier::get))
228+
.doOnError(ex -> {
229+
if (this.state.compareAndSet(State.COMMITTING, State.NEW)) {
230+
getHeaders().clearContentHeaders();
231+
}
232+
});
229233
}
230-
Flux<Void> commit = Flux.empty();
231-
for (Supplier<? extends Mono<Void>> action : this.commitActions) {
232-
commit = commit.concatWith(action.get());
234+
235+
allActions = allActions.concatWith(Mono.fromRunnable(() -> {
236+
applyStatusCode();
237+
applyHeaders();
238+
applyCookies();
239+
this.state.set(State.COMMITTED);
240+
}));
241+
242+
if (writeAction != null) {
243+
allActions = allActions.concatWith(writeAction.get());
233244
}
234-
return commit.then();
245+
246+
return allActions.then();
235247
}
236248

237249

spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import java.nio.charset.StandardCharsets;
2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.function.Consumer;
24+
import java.util.function.Supplier;
2325

2426
import org.junit.jupiter.api.Test;
2527
import org.reactivestreams.Publisher;
2628
import reactor.core.publisher.Flux;
2729
import reactor.core.publisher.Mono;
30+
import reactor.test.StepVerifier;
2831

2932
import org.springframework.core.io.buffer.DataBuffer;
3033
import org.springframework.core.io.buffer.DefaultDataBuffer;
@@ -36,14 +39,16 @@
3639
import static org.assertj.core.api.Assertions.assertThat;
3740

3841
/**
42+
* Unit tests for {@link AbstractServerHttpRequest}.
43+
*
3944
* @author Rossen Stoyanchev
4045
* @author Sebastien Deleuze
4146
* @author Brian Clozel
4247
*/
4348
public class ServerHttpResponseTests {
4449

4550
@Test
46-
void writeWith() throws Exception {
51+
void writeWith() {
4752
TestServerHttpResponse response = new TestServerHttpResponse();
4853
response.writeWith(Flux.just(wrap("a"), wrap("b"), wrap("c"))).block();
4954

@@ -58,7 +63,7 @@ void writeWith() throws Exception {
5863
}
5964

6065
@Test // SPR-14952
61-
void writeAndFlushWithFluxOfDefaultDataBuffer() throws Exception {
66+
void writeAndFlushWithFluxOfDefaultDataBuffer() {
6267
TestServerHttpResponse response = new TestServerHttpResponse();
6368
Flux<Flux<DefaultDataBuffer>> flux = Flux.just(Flux.just(wrap("foo")));
6469
response.writeAndFlushWith(flux).block();
@@ -72,18 +77,18 @@ void writeAndFlushWithFluxOfDefaultDataBuffer() throws Exception {
7277
}
7378

7479
@Test
75-
void writeWithFluxError() throws Exception {
80+
void writeWithFluxError() {
7681
IllegalStateException error = new IllegalStateException("boo");
7782
writeWithError(Flux.error(error));
7883
}
7984

8085
@Test
81-
void writeWithMonoError() throws Exception {
86+
void writeWithMonoError() {
8287
IllegalStateException error = new IllegalStateException("boo");
8388
writeWithError(Mono.error(error));
8489
}
8590

86-
void writeWithError(Publisher<DataBuffer> body) throws Exception {
91+
void writeWithError(Publisher<DataBuffer> body) {
8792
TestServerHttpResponse response = new TestServerHttpResponse();
8893
HttpHeaders headers = response.getHeaders();
8994
headers.setContentType(MediaType.APPLICATION_JSON);
@@ -100,7 +105,7 @@ void writeWithError(Publisher<DataBuffer> body) throws Exception {
100105
}
101106

102107
@Test
103-
void setComplete() throws Exception {
108+
void setComplete() {
104109
TestServerHttpResponse response = new TestServerHttpResponse();
105110
response.setComplete().block();
106111

@@ -111,7 +116,7 @@ void setComplete() throws Exception {
111116
}
112117

113118
@Test
114-
void beforeCommitWithComplete() throws Exception {
119+
void beforeCommitWithComplete() {
115120
ResponseCookie cookie = ResponseCookie.from("ID", "123").build();
116121
TestServerHttpResponse response = new TestServerHttpResponse();
117122
response.beforeCommit(() -> Mono.fromRunnable(() -> response.getCookies().add(cookie.getName(), cookie)));
@@ -129,7 +134,7 @@ void beforeCommitWithComplete() throws Exception {
129134
}
130135

131136
@Test
132-
void beforeCommitActionWithSetComplete() throws Exception {
137+
void beforeCommitActionWithSetComplete() {
133138
ResponseCookie cookie = ResponseCookie.from("ID", "123").build();
134139
TestServerHttpResponse response = new TestServerHttpResponse();
135140
response.beforeCommit(() -> {
@@ -145,6 +150,32 @@ void beforeCommitActionWithSetComplete() throws Exception {
145150
assertThat(response.getCookies().getFirst("ID")).isSameAs(cookie);
146151
}
147152

153+
@Test // gh-24186
154+
void beforeCommitErrorShouldLeaveResponseNotCommitted() {
155+
156+
Consumer<Supplier<Mono<Void>>> tester = preCommitAction -> {
157+
TestServerHttpResponse response = new TestServerHttpResponse();
158+
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
159+
response.getHeaders().setContentLength(3);
160+
response.beforeCommit(preCommitAction);
161+
162+
StepVerifier.create(response.writeWith(Flux.just(wrap("body"))))
163+
.expectErrorMessage("Max sessions")
164+
.verify();
165+
166+
assertThat(response.statusCodeWritten).isFalse();
167+
assertThat(response.headersWritten).isFalse();
168+
assertThat(response.cookiesWritten).isFalse();
169+
assertThat(response.isCommitted()).isFalse();
170+
assertThat(response.getHeaders()).isEmpty();
171+
};
172+
173+
tester.accept(() -> Mono.error(new IllegalStateException("Max sessions")));
174+
tester.accept(() -> {
175+
throw new IllegalStateException("Max sessions");
176+
});
177+
}
178+
148179

149180
private DefaultDataBuffer wrap(String a) {
150181
return new DefaultDataBufferFactory().wrap(ByteBuffer.wrap(a.getBytes(StandardCharsets.UTF_8)));

0 commit comments

Comments
 (0)