Skip to content

Commit 4798a1e

Browse files
violetaggrstoyanchev
authored andcommitted
Cancel Subscription when onError is invoked internally
AbstractResponseBodyProcessor.onError and AbstractResponseBodyFlushProcessor.onError will be invoked when: - The Publisher wants to signal with onError that there are failures. Once onError is invoked the Subscription should be considered canceled. - The internal implementation wants to signal with onError that there are failures. In this use case the implementation should invoke Subscription.cancel()
1 parent 16939b7 commit 4798a1e

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ public final void subscribe(Subscriber<? super Void> subscriber) {
106106
*/
107107
protected abstract void flush() throws IOException;
108108

109+
private void cancel() {
110+
this.subscription.cancel();
111+
}
112+
109113
private void writeComplete() {
110114
if (logger.isTraceEnabled()) {
111115
logger.trace(this.state + " writeComplete");
@@ -157,11 +161,12 @@ public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
157161
else {
158162
try {
159163
processor.flush();
164+
processor.subscription.request(1);
160165
}
161166
catch (IOException ex) {
167+
processor.cancel();
162168
processor.onError(ex);
163169
}
164-
processor.subscription.request(1);
165170
}
166171
}
167172
}, COMPLETED {
@@ -231,6 +236,7 @@ public void onNext(Void aVoid) {
231236

232237
@Override
233238
public void onError(Throwable t) {
239+
processor.cancel();
234240
processor.onError(t);
235241
}
236242

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ protected boolean isWritePossible() {
159159
*/
160160
protected abstract boolean write(DataBuffer dataBuffer) throws IOException;
161161

162+
protected void cancel() {
163+
this.subscription.cancel();
164+
}
165+
162166
private boolean changeState(State oldState, State newState) {
163167
return this.state.compareAndSet(oldState, newState);
164168
}
@@ -220,7 +224,6 @@ void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
220224
@Override
221225
void onComplete(AbstractResponseBodyProcessor processor) {
222226
if (processor.changeState(this, COMPLETED)) {
223-
processor.subscriberCompleted = true;
224227
processor.publisherDelegate.publishComplete();
225228
}
226229
}
@@ -258,6 +261,7 @@ void onWritePossible(AbstractResponseBodyProcessor processor) {
258261
}
259262
}
260263
catch (IOException ex) {
264+
processor.cancel();
261265
processor.onError(ex);
262266
}
263267
}

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ public void onWritePossible() throws IOException {
224224
@Override
225225
public void onError(Throwable ex) {
226226
if (bodyProcessor != null) {
227+
bodyProcessor.cancel();
227228
bodyProcessor.onError(ex);
228229
}
229230
}

0 commit comments

Comments
 (0)