35
35
* Servlet 3.1 and Undertow support.
36
36
*
37
37
* @author Arjen Poutsma
38
+ * @author Violeta Georgieva
38
39
* @since 5.0
39
40
* @see ServletServerHttpRequest
40
41
* @see UndertowHttpHandlerAdapter
41
42
* @see ServerHttpResponse#writeAndFlushWith(Publisher)
42
43
*/
43
- abstract class AbstractResponseBodyFlushProcessor
44
- implements Processor <Publisher <DataBuffer >, Void > {
44
+ abstract class AbstractResponseBodyFlushProcessor implements Processor <Publisher <DataBuffer >, Void > {
45
45
46
46
protected final Log logger = LogFactory .getLog (getClass ());
47
47
48
- private final ResponseBodyWriteResultPublisher publisherDelegate =
49
- new ResponseBodyWriteResultPublisher ();
48
+ private final ResponseBodyWriteResultPublisher resultPublisher = new ResponseBodyWriteResultPublisher ();
50
49
51
- private final AtomicReference <State > state =
52
- new AtomicReference <>(State .UNSUBSCRIBED );
50
+ private final AtomicReference <State > state = new AtomicReference <>(State .UNSUBSCRIBED );
53
51
54
52
private volatile boolean subscriberCompleted ;
55
53
56
54
private Subscription subscription ;
57
55
56
+
58
57
// Subscriber
59
58
60
59
@ Override
@@ -89,13 +88,15 @@ public final void onComplete() {
89
88
this .state .get ().onComplete (this );
90
89
}
91
90
91
+
92
92
// Publisher
93
93
94
94
@ Override
95
95
public final void subscribe (Subscriber <? super Void > subscriber ) {
96
- this .publisherDelegate .subscribe (subscriber );
96
+ this .resultPublisher .subscribe (subscriber );
97
97
}
98
98
99
+
99
100
/**
100
101
* Creates a new processor for subscribing to a body chunk.
101
102
*/
@@ -106,8 +107,9 @@ public final void subscribe(Subscriber<? super Void> subscriber) {
106
107
*/
107
108
protected abstract void flush () throws IOException ;
108
109
109
- private void cancel () {
110
- this .subscription .cancel ();
110
+
111
+ private boolean changeState (State oldState , State newState ) {
112
+ return this .state .compareAndSet (oldState , newState );
111
113
}
112
114
113
115
private void writeComplete () {
@@ -118,15 +120,17 @@ private void writeComplete() {
118
120
119
121
}
120
122
121
- private boolean changeState ( State oldState , State newState ) {
122
- return this .state . compareAndSet ( oldState , newState );
123
+ private void cancel ( ) {
124
+ this .subscription . cancel ( );
123
125
}
124
126
127
+
125
128
private enum State {
129
+
126
130
UNSUBSCRIBED {
131
+
127
132
@ Override
128
- public void onSubscribe (AbstractResponseBodyFlushProcessor processor ,
129
- Subscription subscription ) {
133
+ public void onSubscribe (AbstractResponseBodyFlushProcessor processor , Subscription subscription ) {
130
134
Objects .requireNonNull (subscription , "Subscription cannot be null" );
131
135
if (processor .changeState (this , REQUESTED )) {
132
136
processor .subscription = subscription ;
@@ -138,25 +142,25 @@ public void onSubscribe(AbstractResponseBodyFlushProcessor processor,
138
142
}
139
143
},
140
144
REQUESTED {
145
+
141
146
@ Override
142
- public void onNext (AbstractResponseBodyFlushProcessor processor ,
143
- Publisher <DataBuffer > chunk ) {
147
+ public void onNext (AbstractResponseBodyFlushProcessor processor , Publisher <DataBuffer > chunk ) {
144
148
if (processor .changeState (this , RECEIVED )) {
145
- Processor <DataBuffer , Void > chunkProcessor =
146
- processor .createBodyProcessor ();
149
+ Processor <DataBuffer , Void > chunkProcessor = processor .createBodyProcessor ();
147
150
chunk .subscribe (chunkProcessor );
148
151
chunkProcessor .subscribe (new WriteSubscriber (processor ));
149
152
}
150
153
}
151
154
152
155
@ Override
153
- void onComplete (AbstractResponseBodyFlushProcessor processor ) {
156
+ public void onComplete (AbstractResponseBodyFlushProcessor processor ) {
154
157
if (processor .changeState (this , COMPLETED )) {
155
- processor .publisherDelegate .publishComplete ();
158
+ processor .resultPublisher .publishComplete ();
156
159
}
157
160
}
158
161
},
159
162
RECEIVED {
163
+
160
164
@ Override
161
165
public void writeComplete (AbstractResponseBodyFlushProcessor processor ) {
162
166
try {
@@ -169,7 +173,7 @@ public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
169
173
170
174
if (processor .subscriberCompleted ) {
171
175
if (processor .changeState (this , COMPLETED )) {
172
- processor .publisherDelegate .publishComplete ();
176
+ processor .resultPublisher .publishComplete ();
173
177
}
174
178
}
175
179
else {
@@ -180,11 +184,12 @@ public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
180
184
}
181
185
182
186
@ Override
183
- void onComplete (AbstractResponseBodyFlushProcessor processor ) {
187
+ public void onComplete (AbstractResponseBodyFlushProcessor processor ) {
184
188
processor .subscriberCompleted = true ;
185
189
}
186
190
},
187
191
COMPLETED {
192
+
188
193
@ Override
189
194
public void onNext (AbstractResponseBodyFlushProcessor processor ,
190
195
Publisher <DataBuffer > publisher ) {
@@ -193,40 +198,39 @@ public void onNext(AbstractResponseBodyFlushProcessor processor,
193
198
}
194
199
195
200
@ Override
196
- void onError (AbstractResponseBodyFlushProcessor processor , Throwable t ) {
201
+ public void onError (AbstractResponseBodyFlushProcessor processor , Throwable t ) {
197
202
// ignore
198
203
}
199
204
200
205
@ Override
201
- void onComplete (AbstractResponseBodyFlushProcessor processor ) {
206
+ public void onComplete (AbstractResponseBodyFlushProcessor processor ) {
202
207
// ignore
203
208
}
204
209
};
205
210
206
- public void onSubscribe (AbstractResponseBodyFlushProcessor processor ,
207
- Subscription subscription ) {
211
+ public void onSubscribe (AbstractResponseBodyFlushProcessor processor , Subscription subscription ) {
208
212
subscription .cancel ();
209
213
}
210
214
211
- public void onNext (AbstractResponseBodyFlushProcessor processor ,
212
- Publisher <DataBuffer > publisher ) {
215
+ public void onNext (AbstractResponseBodyFlushProcessor processor , Publisher <DataBuffer > publisher ) {
213
216
throw new IllegalStateException (toString ());
214
217
}
215
218
216
- void onError (AbstractResponseBodyFlushProcessor processor , Throwable t ) {
219
+ public void onError (AbstractResponseBodyFlushProcessor processor , Throwable ex ) {
217
220
if (processor .changeState (this , COMPLETED )) {
218
- processor .publisherDelegate .publishError (t );
221
+ processor .resultPublisher .publishError (ex );
219
222
}
220
223
}
221
224
222
- void onComplete (AbstractResponseBodyFlushProcessor processor ) {
225
+ public void onComplete (AbstractResponseBodyFlushProcessor processor ) {
223
226
throw new IllegalStateException (toString ());
224
227
}
225
228
226
229
public void writeComplete (AbstractResponseBodyFlushProcessor processor ) {
227
230
// ignore
228
231
}
229
232
233
+
230
234
private static class WriteSubscriber implements Subscriber <Void > {
231
235
232
236
private final AbstractResponseBodyFlushProcessor processor ;
@@ -236,23 +240,23 @@ public WriteSubscriber(AbstractResponseBodyFlushProcessor processor) {
236
240
}
237
241
238
242
@ Override
239
- public void onSubscribe (Subscription s ) {
240
- s .request (Long .MAX_VALUE );
243
+ public void onSubscribe (Subscription subscription ) {
244
+ subscription .request (Long .MAX_VALUE );
241
245
}
242
246
243
247
@ Override
244
248
public void onNext (Void aVoid ) {
245
249
}
246
250
247
251
@ Override
248
- public void onError (Throwable t ) {
249
- processor .cancel ();
250
- processor .onError (t );
252
+ public void onError (Throwable ex ) {
253
+ this . processor .cancel ();
254
+ this . processor .onError (ex );
251
255
}
252
256
253
257
@ Override
254
258
public void onComplete () {
255
- processor .writeComplete ();
259
+ this . processor .writeComplete ();
256
260
}
257
261
}
258
262
}
0 commit comments