22
22
import java .nio .charset .Charset ;
23
23
import java .util .List ;
24
24
import java .util .Map ;
25
- import java .util .concurrent .atomic .AtomicBoolean ;
26
25
import javax .servlet .ServletOutputStream ;
27
26
import javax .servlet .WriteListener ;
28
27
import javax .servlet .http .Cookie ;
29
28
import javax .servlet .http .HttpServletResponse ;
30
29
31
30
import org .reactivestreams .Processor ;
31
+ import org .reactivestreams .Publisher ;
32
32
33
33
import org .springframework .core .io .buffer .DataBuffer ;
34
34
import org .springframework .core .io .buffer .DataBufferFactory ;
44
44
*/
45
45
public class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
46
46
47
- private final AtomicBoolean listenerRegistered = new AtomicBoolean ();
47
+ private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener ();
48
48
49
49
private volatile ResponseBodyProcessor bodyProcessor ;
50
50
@@ -112,15 +112,17 @@ protected void writeCookies() {
112
112
}
113
113
}
114
114
115
- private void registerListener () throws IOException {
116
- if (this .listenerRegistered .compareAndSet (false , true )) {
117
- ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener ();
118
- this .response .getOutputStream ().setWriteListener (writeListener );
115
+ private void registerListener () {
116
+ try {
117
+ outputStream ().setWriteListener (writeListener );
118
+ }
119
+ catch (IOException e ) {
120
+ throw new UncheckedIOException (e );
119
121
}
120
122
}
121
123
122
124
private void flush () throws IOException {
123
- ServletOutputStream outputStream = this . response . getOutputStream ();
125
+ ServletOutputStream outputStream = outputStream ();
124
126
if (outputStream .isReady ()) {
125
127
try {
126
128
outputStream .flush ();
@@ -136,22 +138,15 @@ private void flush() throws IOException {
136
138
}
137
139
}
138
140
139
- @ Override
140
- protected ResponseBodyProcessor createBodyProcessor () {
141
- try {
142
- registerListener ();
143
- this .bodyProcessor = new ResponseBodyProcessor (this .response .getOutputStream (),
144
- this .bufferSize );
145
- return this .bodyProcessor ;
146
- }
147
- catch (IOException ex ) {
148
- throw new UncheckedIOException (ex );
149
- }
141
+ private ServletOutputStream outputStream () throws IOException {
142
+ return this .response .getOutputStream ();
150
143
}
151
144
152
145
@ Override
153
- protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor () {
154
- return new ResponseBodyFlushProcessor ();
146
+ protected Processor <Publisher <DataBuffer >, Void > createBodyFlushProcessor () {
147
+ Processor <Publisher <DataBuffer >, Void > processor = new ResponseBodyFlushProcessor ();
148
+ registerListener ();
149
+ return processor ;
155
150
}
156
151
157
152
private class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
@@ -238,7 +233,13 @@ private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProces
238
233
239
234
@ Override
240
235
protected Processor <DataBuffer , Void > createBodyProcessor () {
241
- return ServletServerHttpResponse .this .createBodyProcessor ();
236
+ try {
237
+ bodyProcessor = new ResponseBodyProcessor (outputStream (), bufferSize );
238
+ return bodyProcessor ;
239
+ }
240
+ catch (IOException ex ) {
241
+ throw new UncheckedIOException (ex );
242
+ }
242
243
}
243
244
244
245
@ Override
0 commit comments