Skip to content

Commit 3015286

Browse files
violetaggrstoyanchev
authored andcommitted
Refactor AbstractResponseBodyFlushProcessor states
With the current state machine - the implementation can hang after the last element when executing on Jetty. - in some cases there will be no flush after the last Publisher<DataBuffer>.
1 parent 4798a1e commit 3015286

File tree

2 files changed

+117
-21
lines changed

2 files changed

+117
-21
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -128,48 +128,63 @@ private enum State {
128128
public void onSubscribe(AbstractResponseBodyFlushProcessor processor,
129129
Subscription subscription) {
130130
Objects.requireNonNull(subscription, "Subscription cannot be null");
131-
if (processor.changeState(this, SUBSCRIBED)) {
131+
if (processor.changeState(this, REQUESTED)) {
132132
processor.subscription = subscription;
133133
subscription.request(1);
134134
}
135135
else {
136136
super.onSubscribe(processor, subscription);
137137
}
138138
}
139-
}, SUBSCRIBED {
139+
},
140+
REQUESTED {
140141
@Override
141142
public void onNext(AbstractResponseBodyFlushProcessor processor,
142143
Publisher<DataBuffer> chunk) {
143-
Processor<DataBuffer, Void> chunkProcessor =
144-
processor.createBodyProcessor();
145-
chunk.subscribe(chunkProcessor);
146-
chunkProcessor.subscribe(new WriteSubscriber(processor));
144+
if (processor.changeState(this, RECEIVED)) {
145+
Processor<DataBuffer, Void> chunkProcessor =
146+
processor.createBodyProcessor();
147+
chunk.subscribe(chunkProcessor);
148+
chunkProcessor.subscribe(new WriteSubscriber(processor));
149+
}
147150
}
148151

149152
@Override
150153
void onComplete(AbstractResponseBodyFlushProcessor processor) {
151-
processor.subscriberCompleted = true;
154+
if (processor.changeState(this, COMPLETED)) {
155+
processor.publisherDelegate.publishComplete();
156+
}
152157
}
153-
158+
},
159+
RECEIVED {
154160
@Override
155161
public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
162+
try {
163+
processor.flush();
164+
}
165+
catch (IOException ex) {
166+
processor.cancel();
167+
processor.onError(ex);
168+
}
169+
156170
if (processor.subscriberCompleted) {
157171
if (processor.changeState(this, COMPLETED)) {
158172
processor.publisherDelegate.publishComplete();
159173
}
160174
}
161175
else {
162-
try {
163-
processor.flush();
176+
if (processor.changeState(this, REQUESTED)) {
164177
processor.subscription.request(1);
165178
}
166-
catch (IOException ex) {
167-
processor.cancel();
168-
processor.onError(ex);
169-
}
170179
}
171180
}
172-
}, COMPLETED {
181+
182+
@Override
183+
void onComplete(AbstractResponseBodyFlushProcessor processor) {
184+
processor.subscriberCompleted = true;
185+
}
186+
},
187+
COMPLETED {
173188
@Override
174189
public void onNext(AbstractResponseBodyFlushProcessor processor,
175190
Publisher<DataBuffer> publisher) {
@@ -186,11 +201,6 @@ void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) {
186201
void onComplete(AbstractResponseBodyFlushProcessor processor) {
187202
// ignore
188203
}
189-
190-
@Override
191-
public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
192-
// ignore
193-
}
194204
};
195205

196206
public void onSubscribe(AbstractResponseBodyFlushProcessor processor,
@@ -214,7 +224,7 @@ void onComplete(AbstractResponseBodyFlushProcessor processor) {
214224
}
215225

216226
public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
217-
throw new IllegalStateException(toString());
227+
// ignore
218228
}
219229

220230
private static class WriteSubscriber implements Subscriber<Void> {
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.server.reactive;
18+
19+
import java.net.URI;
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.Random;
22+
23+
import org.junit.Test;
24+
import org.springframework.core.io.buffer.DataBuffer;
25+
import org.springframework.http.RequestEntity;
26+
import org.springframework.http.ResponseEntity;
27+
import org.springframework.http.server.reactive.HttpHandler;
28+
import org.springframework.http.server.reactive.ServerHttpRequest;
29+
import org.springframework.http.server.reactive.ServerHttpResponse;
30+
import org.springframework.web.client.RestTemplate;
31+
32+
import static org.junit.Assert.*;
33+
34+
import reactor.core.publisher.Flux;
35+
import reactor.core.publisher.Mono;
36+
37+
/**
38+
* @author Violeta Georgieva
39+
* @since 5.0
40+
*/
41+
public class WriteOnlyHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests {
42+
43+
private static final int REQUEST_SIZE = 4096 * 3;
44+
45+
private Random rnd = new Random();
46+
47+
private byte[] body;
48+
49+
50+
@Override
51+
protected WriteOnlyHandler createHttpHandler() {
52+
return new WriteOnlyHandler();
53+
}
54+
55+
56+
@Test
57+
public void writeOnly() throws Exception {
58+
RestTemplate restTemplate = new RestTemplate();
59+
60+
this.body = randomBytes();
61+
RequestEntity<byte[]> request = RequestEntity.post(
62+
new URI("http://localhost:" + port)).body(
63+
"".getBytes(StandardCharsets.UTF_8));
64+
ResponseEntity<byte[]> response = restTemplate.exchange(request, byte[].class);
65+
66+
assertArrayEquals(body, response.getBody());
67+
}
68+
69+
70+
private byte[] randomBytes() {
71+
byte[] buffer = new byte[REQUEST_SIZE];
72+
rnd.nextBytes(buffer);
73+
return buffer;
74+
}
75+
76+
77+
public class WriteOnlyHandler implements HttpHandler {
78+
79+
@Override
80+
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
81+
DataBuffer buffer = response.bufferFactory().allocateBuffer(body.length);
82+
buffer.write(body);
83+
return response.writeAndFlushWith(Flux.just(Flux.just(buffer)));
84+
}
85+
}
86+
}

0 commit comments

Comments
 (0)