19
19
import java .io .IOException ;
20
20
import java .util .ArrayList ;
21
21
import java .util .List ;
22
- import java .util .concurrent .atomic . AtomicReference ;
22
+ import java .util .concurrent .locks . ReentrantLock ;
23
23
import java .util .function .Consumer ;
24
24
25
25
import javax .servlet .AsyncContext ;
34
34
import org .springframework .lang .Nullable ;
35
35
import org .springframework .util .Assert ;
36
36
import org .springframework .web .context .request .ServletWebRequest ;
37
+ import org .springframework .web .util .DisconnectedClientHelper ;
37
38
38
39
/**
39
40
* A Servlet implementation of {@link AsyncWebRequest}.
@@ -60,9 +61,9 @@ public class StandardServletAsyncWebRequest extends ServletWebRequest implements
60
61
@ Nullable
61
62
private AsyncContext asyncContext ;
62
63
63
- private final AtomicReference < State > state ;
64
+ private State state ;
64
65
65
- private volatile boolean hasError ;
66
+ private final ReentrantLock stateLock = new ReentrantLock () ;
66
67
67
68
68
69
/**
@@ -87,13 +88,7 @@ public StandardServletAsyncWebRequest(HttpServletRequest request, HttpServletRes
87
88
88
89
super (request , new LifecycleHttpServletResponse (response ));
89
90
90
- if (previousRequest != null ) {
91
- this .state = previousRequest .state ;
92
- this .hasError = previousRequest .hasError ;
93
- }
94
- else {
95
- this .state = new AtomicReference <>(State .ACTIVE );
96
- }
91
+ this .state = (previousRequest != null ? previousRequest .state : State .ACTIVE );
97
92
98
93
//noinspection DataFlowIssue
99
94
((LifecycleHttpServletResponse ) getResponse ()).setAsyncWebRequest (this );
@@ -137,7 +132,7 @@ public boolean isAsyncStarted() {
137
132
*/
138
133
@ Override
139
134
public boolean isAsyncComplete () {
140
- return (this .state . get () == State .COMPLETED );
135
+ return (this .state == State .COMPLETED );
141
136
}
142
137
143
138
@ Override
@@ -184,20 +179,41 @@ public void onTimeout(AsyncEvent event) throws IOException {
184
179
185
180
@ Override
186
181
public void onError (AsyncEvent event ) throws IOException {
187
- transitionToErrorState ();
188
- this .exceptionHandlers .forEach (consumer -> consumer .accept (event .getThrowable ()));
182
+ this .stateLock .lock ();
183
+ try {
184
+ transitionToErrorState ();
185
+ Throwable ex = event .getThrowable ();
186
+ this .exceptionHandlers .forEach (consumer -> consumer .accept (ex ));
187
+
188
+ // We skip ASYNC dispatches for "disconnected client" errors,
189
+ // but can only complete from a Servlet container thread
190
+
191
+ if (DisconnectedClientHelper .isClientDisconnectedException (ex ) && this .state != State .COMPLETED ) {
192
+ this .asyncContext .complete ();
193
+ }
194
+ }
195
+ finally {
196
+ this .stateLock .unlock ();
197
+ }
189
198
}
190
199
191
200
private void transitionToErrorState () {
192
- this .hasError = true ;
193
- this .state .compareAndSet (State .ACTIVE , State .ERROR );
201
+ if (this .state == State .ACTIVE ) {
202
+ this .state = State .ERROR ;
203
+ }
194
204
}
195
205
196
206
@ Override
197
207
public void onComplete (AsyncEvent event ) throws IOException {
198
- this .completionHandlers .forEach (Runnable ::run );
199
- this .asyncContext = null ;
200
- this .state .set (State .COMPLETED );
208
+ this .stateLock .lock ();
209
+ try {
210
+ this .completionHandlers .forEach (Runnable ::run );
211
+ this .asyncContext = null ;
212
+ this .state = State .COMPLETED ;
213
+ }
214
+ finally {
215
+ this .stateLock .unlock ();
216
+ }
201
217
}
202
218
203
219
@@ -256,59 +272,76 @@ public boolean isReady() {
256
272
257
273
@ Override
258
274
public void setWriteListener (WriteListener writeListener ) {
275
+ throw new UnsupportedOperationException ();
259
276
}
260
277
261
278
@ Override
262
279
public void write (int b ) throws IOException {
263
- checkState ();
280
+ obtainLockAndCheckState ();
264
281
try {
265
282
this .delegate .getOutputStream ().write (b );
266
283
}
267
284
catch (IOException ex ) {
268
285
handleIOException (ex , "ServletOutputStream failed to write" );
269
286
}
287
+ finally {
288
+ releaseLock ();
289
+ }
270
290
}
271
291
272
292
public void write (byte [] buf , int offset , int len ) throws IOException {
273
- checkState ();
293
+ obtainLockAndCheckState ();
274
294
try {
275
295
this .delegate .getOutputStream ().write (buf , offset , len );
276
296
}
277
297
catch (IOException ex ) {
278
298
handleIOException (ex , "ServletOutputStream failed to write" );
279
299
}
300
+ finally {
301
+ releaseLock ();
302
+ }
280
303
}
281
304
282
305
@ Override
283
306
public void flush () throws IOException {
284
- checkState ();
307
+ obtainLockAndCheckState ();
285
308
try {
286
309
this .delegate .getOutputStream ().flush ();
287
310
}
288
311
catch (IOException ex ) {
289
312
handleIOException (ex , "ServletOutputStream failed to flush" );
290
313
}
314
+ finally {
315
+ releaseLock ();
316
+ }
291
317
}
292
318
293
319
@ Override
294
320
public void close () throws IOException {
295
- checkState ();
321
+ obtainLockAndCheckState ();
296
322
try {
297
323
this .delegate .getOutputStream ().close ();
298
324
}
299
325
catch (IOException ex ) {
300
326
handleIOException (ex , "ServletOutputStream failed to close" );
301
327
}
328
+ finally {
329
+ releaseLock ();
330
+ }
302
331
}
303
332
304
- private void checkState () throws AsyncRequestNotUsableException {
305
- if (this .asyncWebRequest .state . get () != State .ACTIVE ) {
333
+ private void obtainLockAndCheckState () throws AsyncRequestNotUsableException {
334
+ if (! this .asyncWebRequest .stateLock . tryLock () || this . asyncWebRequest . state != State .ACTIVE ) {
306
335
throw new AsyncRequestNotUsableException ("Response not usable after " +
307
- (this .asyncWebRequest .state . get () == State .COMPLETED ?
336
+ (this .asyncWebRequest .state == State .COMPLETED ?
308
337
"async request completion" : "onError notification" ) + "." );
309
338
}
310
339
}
311
340
341
+ private void releaseLock () {
342
+ this .asyncWebRequest .stateLock .unlock ();
343
+ }
344
+
312
345
private void handleIOException (IOException ex , String msg ) throws AsyncRequestNotUsableException {
313
346
this .asyncWebRequest .transitionToErrorState ();
314
347
throw new AsyncRequestNotUsableException (msg , ex );
0 commit comments