Skip to content

Commit 102a0ad

Browse files
committed
Polish
1 parent c1b191e commit 102a0ad

File tree

4 files changed

+101
-65
lines changed

4 files changed

+101
-65
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,30 @@ public void subscribe(Subscriber<? super T> subscriber) {
7373
}
7474

7575

76-
// Methods for sub-classes to delegate to, when async I/O events occur...
76+
// Async I/O notification methods...
7777

78+
/**
79+
* Invoked when reading is possible, either in the same thread after a check
80+
* via {@link #checkOnDataAvailable()}, or as a callback from the underlying
81+
* container.
82+
*/
7883
public final void onDataAvailable() {
7984
this.logger.trace("I/O event onDataAvailable");
8085
this.state.get().onDataAvailable(this);
8186
}
8287

88+
/**
89+
* Sub-classes can call this method to delegate a contain notification when
90+
* all data has been read.
91+
*/
8392
public void onAllDataRead() {
8493
this.logger.trace("I/O event onAllDataRead");
8594
this.state.get().onAllDataRead(this);
8695
}
8796

97+
/**
98+
* Sub-classes can call this to delegate container error notifications.
99+
*/
88100
public final void onError(Throwable ex) {
89101
if (this.logger.isTraceEnabled()) {
90102
this.logger.trace("I/O event onError: " + ex);
@@ -93,11 +105,11 @@ public final void onError(Throwable ex) {
93105
}
94106

95107

96-
// Methods for sub-classes to implement...
108+
// Read API methods to be implemented or template methods to override...
97109

98110
/**
99-
* Check if data is available, calling {@link #onDataAvailable()} either
100-
* immediately or later when reading is possible.
111+
* Check if data is available and either call {@link #onDataAvailable()}
112+
* immediately or schedule a notification.
101113
*/
102114
protected abstract void checkOnDataAvailable();
103115

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

Lines changed: 47 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
5353
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
5454

5555

56-
// Subscriber methods and methods to notify of async I/O events...
56+
// Subscriber methods and async I/O notification methods...
5757

5858
@Override
5959
public final void onSubscribe(Subscription subscription) {
@@ -67,8 +67,8 @@ public final void onNext(Publisher<? extends T> publisher) {
6767
}
6868

6969
/**
70-
* Notify of an error. This can come from the upstream write Publisher or
71-
* from sub-classes as a result of an I/O error.
70+
* Error signal from the upstream, write Publisher. This is also used by
71+
* sub-classes to delegate error notifications from the container.
7272
*/
7373
@Override
7474
public final void onError(Throwable ex) {
@@ -79,68 +79,76 @@ public final void onError(Throwable ex) {
7979
}
8080

8181
/**
82-
* Notify of completion. This can come from the upstream write Publisher or
83-
* from sub-classes as a result of an I/O completion event.
82+
* Completion signal from the upstream, write Publisher. This is also used
83+
* by sub-classes to delegate completion notifications from the container.
8484
*/
8585
@Override
8686
public final void onComplete() {
8787
logger.trace("Received onComplete");
8888
this.state.get().onComplete(this);
8989
}
9090

91+
/**
92+
* Invoked when flusing is possible, either in the same thread after a check
93+
* via {@link #isWritePossible()}, or as a callback from the underlying
94+
* container.
95+
*/
96+
protected final void onFlushPossible() {
97+
this.state.get().onFlushPossible(this);
98+
}
99+
100+
/**
101+
* Invoked during an error or completion callback from the underlying
102+
* container to cancel the upstream subscription.
103+
*/
91104
protected void cancel() {
92105
this.logger.trace("Received request to cancel");
93106
if (this.subscription != null) {
94107
this.subscription.cancel();
95108
}
96109
}
97110

98-
// Publisher method...
111+
112+
// Publisher implementation for result notifications...
99113

100114
@Override
101115
public final void subscribe(Subscriber<? super Void> subscriber) {
102116
this.resultPublisher.subscribe(subscriber);
103117
}
104118

105119

106-
// Methods for sub-classes to implement or override...
120+
// Write API methods to be implemented or template methods to override...
107121

108122
/**
109-
* Create a new processor for subscribing to the next flush boundary.
123+
* Create a new processor for the current flush boundary.
110124
*/
111125
protected abstract Processor<? super T, Void> createWriteProcessor();
112126

113127
/**
114-
* Flush the output if ready, or otherwise {@link #isFlushPending()} should
115-
* return true after that.
128+
* Whether writing/flushing is possible.
116129
*/
117-
protected abstract void flush() throws IOException;
130+
protected abstract boolean isWritePossible();
118131

119132
/**
120-
* Invoked when an error happens while flushing. Defaults to no-op.
121-
* Servlet 3.1 based implementations will receive an
122-
* {@link javax.servlet.AsyncListener#onError} event.
133+
* Flush the output if ready, or otherwise {@link #isFlushPending()} should
134+
* return true after.
123135
*/
124-
protected void flushingFailed(Throwable t) {
125-
}
136+
protected abstract void flush() throws IOException;
126137

127138
/**
128139
* Whether flushing is pending.
129140
*/
130141
protected abstract boolean isFlushPending();
131142

132143
/**
133-
* Listeners can call this to notify when flushing is possible.
144+
* Invoked when an error happens while flushing. Sub-classes may choose
145+
* to ignore this if they know the underlying API will provide an error
146+
* notification in a container thread.
147+
* <p>Defaults to no-op.
134148
*/
135-
protected final void onFlushPossible() {
136-
this.state.get().onFlushPossible(this);
149+
protected void flushingFailed(Throwable t) {
137150
}
138151

139-
/**
140-
* Whether writing is possible.
141-
*/
142-
protected abstract boolean isWritePossible();
143-
144152

145153
// Private methods for use in State...
146154

@@ -167,16 +175,16 @@ private void flushIfPossible() {
167175
* Represents a state for the {@link Processor} to be in.
168176
*
169177
* <p><pre>
170-
* UNSUBSCRIBED
171-
* |
172-
* v
173-
* +--- REQUESTED <--------> RECEIVED ---+
174-
* | | |
175-
* | | |
176-
* | FLUSHING <------+ |
177-
* | | |
178-
* | v |
179-
* +----------> COMPLETED <--------------+
178+
* UNSUBSCRIBED
179+
* |
180+
* v
181+
* REQUESTED <---> RECEIVED ------+
182+
* | | |
183+
* | v |
184+
* | FLUSHING |
185+
* | | |
186+
* | v |
187+
* +--------> COMPLETED <-----+
180188
* </pre>
181189
*/
182190
private enum State {
@@ -269,7 +277,7 @@ public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor
269277
processor.state.get().onComplete(processor);
270278
}
271279
}
272-
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
280+
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
273281
// ignore
274282
}
275283
@Override
@@ -280,7 +288,7 @@ public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
280288

281289
COMPLETED {
282290
@Override
283-
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
291+
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
284292
// ignore
285293
}
286294
@Override
@@ -294,11 +302,11 @@ public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
294302
};
295303

296304

297-
public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> processor, Subscription subscription) {
305+
public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> proc, Subscription subscription) {
298306
subscription.cancel();
299307
}
300308

301-
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
309+
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
302310
throw new IllegalStateException(toString());
303311
}
304312

@@ -326,7 +334,7 @@ public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor
326334

327335
/**
328336
* Subscriber to receive and delegate completion notifications for from
329-
* the current Publisher, i.e. within the current flush boundary.
337+
* the current Publisher, i.e. for the current flush boundary.
330338
*/
331339
private static class WriteResultSubscriber implements Subscriber<Void> {
332340

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
5858
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
5959

6060

61-
// Subscriber methods and methods to notify of async I/O events...
61+
// Subscriber methods and async I/O notification methods...
6262

6363
@Override
6464
public final void onSubscribe(Subscription subscription) {
@@ -72,8 +72,8 @@ public final void onNext(T data) {
7272
}
7373

7474
/**
75-
* Notify of an error. This can come from the upstream write Publisher or
76-
* from sub-classes as a result of an I/O error.
75+
* Error signal from the upstream, write Publisher. This is also used by
76+
* sub-classes to delegate error notifications from the container.
7777
*/
7878
@Override
7979
public final void onError(Throwable ex) {
@@ -84,36 +84,45 @@ public final void onError(Throwable ex) {
8484
}
8585

8686
/**
87-
* Notify of completion. This can come from the upstream write Publisher or
88-
* from sub-classes as a result of an I/O completion event.
87+
* Completion signal from the upstream, write Publisher. This is also used
88+
* by sub-classes to delegate completion notifications from the container.
8989
*/
9090
@Override
9191
public final void onComplete() {
9292
logger.trace("Received onComplete");
9393
this.state.get().onComplete(this);
9494
}
9595

96+
/**
97+
* Invoked when writing is possible, either in the same thread after a check
98+
* via {@link #isWritePossible()}, or as a callback from the underlying
99+
* container.
100+
*/
96101
public final void onWritePossible() {
97102
this.logger.trace("Received onWritePossible");
98103
this.state.get().onWritePossible(this);
99104
}
100105

106+
/**
107+
* Invoked during an error or completion callback from the underlying
108+
* container to cancel the upstream subscription.
109+
*/
101110
public void cancel() {
102111
this.logger.trace("Received request to cancel");
103112
if (this.subscription != null) {
104113
this.subscription.cancel();
105114
}
106115
}
107116

108-
// Publisher method...
117+
// Publisher implementation for result notifications...
109118

110119
@Override
111120
public final void subscribe(Subscriber<? super Void> subscriber) {
112121
this.resultPublisher.subscribe(subscriber);
113122
}
114123

115124

116-
// Methods for sub-classes to implement or override...
125+
// Write API methods to be implemented or template methods to override...
117126

118127
/**
119128
* Whether the given data item has any content to write.
@@ -122,8 +131,9 @@ public final void subscribe(Subscriber<? super Void> subscriber) {
122131
protected abstract boolean isDataEmpty(T data);
123132

124133
/**
125-
* Called when a data item is received via {@link Subscriber#onNext(Object)}.
126-
* The default implementation saves the data for writing when possible.
134+
* Template method invoked after a data item to write is received via
135+
* {@link Subscriber#onNext(Object)}. The default implementation saves the
136+
* data item for writing once that is possible.
127137
*/
128138
protected void dataReceived(T data) {
129139
if (this.currentData != null) {
@@ -151,27 +161,31 @@ protected void dataReceived(T data) {
151161
protected abstract boolean write(T data) throws IOException;
152162

153163
/**
154-
* Suspend writing. Defaults to no-op.
164+
* Invoked after the current data has been written and before requesting
165+
* the next item from the upstream, write Publisher.
166+
* <p>The default implementation is a no-op.
155167
*/
156168
protected void suspendWriting() {
157169
}
158170

159171
/**
160-
* Invoked when writing is complete. Defaults to no-op.
172+
* Invoked after onComplete or onError notification.
173+
* <p>The default implementation is a no-op.
161174
*/
162175
protected void writingComplete() {
163176
}
164177

165178
/**
166-
* Invoked when an error happens while writing.
167-
* <p>Defaults to no-op. Servlet 3.1 based implementations will receive
168-
* {@code javax.servlet.WriteListener#onError(Throwable)} event.
179+
* Invoked when an I/O error occurs during a write. Sub-classes may choose
180+
* to ignore this if they know the underlying API will provide an error
181+
* notification in a container thread.
182+
* <p>Defaults to no-op.
169183
*/
170184
protected void writingFailed(Throwable ex) {
171185
}
172186

173187

174-
// Private methods for use in State...
188+
// Private methods for use from State's...
175189

176190
private boolean changeState(State oldState, State newState) {
177191
boolean result = this.state.compareAndSet(oldState, newState);

0 commit comments

Comments
 (0)