Skip to content

Commit 1c6bab2

Browse files
committed
Polishing
See gh-26434
1 parent 3e502d4 commit 1c6bab2

File tree

4 files changed

+34
-25
lines changed

4 files changed

+34
-25
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,12 +38,12 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
3838
private final AtomicBoolean writeCalled = new AtomicBoolean();
3939

4040

41-
public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory) {
42-
super(dataBufferFactory);
41+
public AbstractListenerServerHttpResponse(DataBufferFactory bufferFactory) {
42+
super(bufferFactory);
4343
}
4444

45-
public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory, HttpHeaders headers) {
46-
super(dataBufferFactory, headers);
45+
public AbstractListenerServerHttpResponse(DataBufferFactory bufferFactory, HttpHeaders headers) {
46+
super(bufferFactory, headers);
4747
}
4848

4949

@@ -56,15 +56,15 @@ protected final Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> bod
5656
protected final Mono<Void> writeAndFlushWithInternal(
5757
Publisher<? extends Publisher<? extends DataBuffer>> body) {
5858

59-
if (this.writeCalled.compareAndSet(false, true)) {
60-
Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
61-
return Mono.from(subscriber -> {
62-
body.subscribe(processor);
63-
processor.subscribe(subscriber);
64-
});
59+
if (!this.writeCalled.compareAndSet(false, true)) {
60+
return Mono.error(new IllegalStateException(
61+
"writeWith() or writeAndFlushWith() has already been called"));
6562
}
66-
return Mono.error(new IllegalStateException(
67-
"writeWith() or writeAndFlushWith() has already been called"));
63+
Processor<? super Publisher<? extends DataBuffer>, Void> processor = createBodyFlushProcessor();
64+
return Mono.from(subscriber -> {
65+
body.subscribe(processor);
66+
processor.subscribe(subscriber);
67+
});
6868
}
6969

7070
/**

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
5858
@Nullable
5959
private Subscription subscription;
6060

61-
private volatile boolean subscriberCompleted;
61+
private volatile boolean sourceCompleted;
6262

6363
private final WriteResultPublisher resultPublisher;
6464

@@ -293,7 +293,7 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
293293
return;
294294
}
295295
if (processor.changeState(this, REQUESTED)) {
296-
if (processor.subscriberCompleted) {
296+
if (processor.sourceCompleted) {
297297
handleSubscriberCompleted(processor);
298298
}
299299
else {
@@ -304,7 +304,7 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
304304
}
305305
@Override
306306
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
307-
processor.subscriberCompleted = true;
307+
processor.sourceCompleted = true;
308308
// A competing write might have completed very quickly
309309
if (processor.state.get().equals(State.REQUESTED)) {
310310
handleSubscriberCompleted(processor);

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -64,7 +64,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
6464
private volatile T currentData;
6565

6666
/* Indicates "onComplete" was received during the (last) write. */
67-
private volatile boolean subscriberCompleted;
67+
private volatile boolean sourceCompleted;
6868

6969
/**
7070
* Indicates we're waiting for one last isReady-onWritePossible cycle
@@ -374,7 +374,7 @@ else if (processor.changeState(this, WRITING)) {
374374
if (processor.write(data)) {
375375
if (processor.changeState(WRITING, REQUESTED)) {
376376
processor.currentData = null;
377-
if (processor.subscriberCompleted) {
377+
if (processor.sourceCompleted) {
378378
processor.readyToCompleteAfterLastWrite = true;
379379
processor.changeStateToReceived(REQUESTED);
380380
}
@@ -397,7 +397,7 @@ else if (processor.changeState(this, WRITING)) {
397397

398398
@Override
399399
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
400-
processor.subscriberCompleted = true;
400+
processor.sourceCompleted = true;
401401
// A competing write might have completed very quickly
402402
if (processor.state.get().equals(State.REQUESTED)) {
403403
processor.changeStateToComplete(State.REQUESTED);
@@ -408,7 +408,7 @@ public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
408408
WRITING {
409409
@Override
410410
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
411-
processor.subscriberCompleted = true;
411+
processor.sourceCompleted = true;
412412
// A competing write might have completed very quickly
413413
if (processor.state.get().equals(State.REQUESTED)) {
414414
processor.changeStateToComplete(State.REQUESTED);

spring-web/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -239,15 +239,24 @@ private static void runIfAsyncNotComplete(AsyncContext asyncContext, AtomicBoole
239239
}
240240

241241

242+
/**
243+
* AsyncListener to complete the {@link AsyncContext} in case of error or
244+
* timeout notifications from the container
245+
* <p>Additional {@link AsyncListener}s are registered in
246+
* {@link ServletServerHttpRequest} to signal onError/onComplete to the
247+
* request body Subscriber, and in {@link ServletServerHttpResponse} to
248+
* cancel the write Publisher and signal onError/onComplete downstream to
249+
* the writing result Subscriber.
250+
*/
242251
private static class HandlerResultAsyncListener implements AsyncListener {
243252

244253
private final AtomicBoolean isCompleted;
245254

246255
private final String logPrefix;
247256

248-
public HandlerResultAsyncListener(AtomicBoolean isCompleted, ServletServerHttpRequest httpRequest) {
257+
public HandlerResultAsyncListener(AtomicBoolean isCompleted, ServletServerHttpRequest request) {
249258
this.isCompleted = isCompleted;
250-
this.logPrefix = httpRequest.getLogPrefix();
259+
this.logPrefix = request.getLogPrefix();
251260
}
252261

253262
@Override
@@ -277,7 +286,7 @@ public void onComplete(AsyncEvent event) {
277286
}
278287

279288

280-
private class HandlerResultSubscriber implements Subscriber<Void> {
289+
private static class HandlerResultSubscriber implements Subscriber<Void> {
281290

282291
private final AsyncContext asyncContext;
283292

0 commit comments

Comments
 (0)