@@ -52,10 +52,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
52
52
53
53
private volatile long demand ;
54
54
55
- private volatile boolean publisherCompleted ;
55
+ private volatile boolean completionBeforeDemand ;
56
56
57
57
@ Nullable
58
- private volatile Throwable publisherError ;
58
+ private volatile Throwable errorBeforeDemand ;
59
59
60
60
@ SuppressWarnings ("rawtypes" )
61
61
private static final AtomicLongFieldUpdater <AbstractListenerReadPublisher > DEMAND_FIELD_UPDATER =
@@ -76,31 +76,22 @@ public void subscribe(Subscriber<? super T> subscriber) {
76
76
}
77
77
78
78
79
- // Listener delegation methods ...
79
+ // Methods for sub-classes to delegate to, when async I/O events occur ...
80
80
81
- /**
82
- * Listeners can call this to notify when reading is possible.
83
- */
84
81
public final void onDataAvailable () {
85
82
if (this .logger .isTraceEnabled ()) {
86
83
this .logger .trace (this .state + " onDataAvailable" );
87
84
}
88
85
this .state .get ().onDataAvailable (this );
89
86
}
90
87
91
- /**
92
- * Listeners can call this to notify when all data has been read.
93
- */
94
88
public void onAllDataRead () {
95
89
if (this .logger .isTraceEnabled ()) {
96
90
this .logger .trace (this .state + " onAllDataRead" );
97
91
}
98
92
this .state .get ().onAllDataRead (this );
99
93
}
100
94
101
- /**
102
- * Listeners can call this to notify when a read error has occurred.
103
- */
104
95
public final void onError (Throwable t ) {
105
96
if (this .logger .isTraceEnabled ()) {
106
97
this .logger .trace (this .state + " onError: " + t );
@@ -109,11 +100,17 @@ public final void onError(Throwable t) {
109
100
}
110
101
111
102
103
+ // Methods for sub-classes to implement...
104
+
105
+ /**
106
+ * Check if data is available, calling {@link #onDataAvailable()} either
107
+ * immediately or later when reading is possible.
108
+ */
112
109
protected abstract void checkOnDataAvailable ();
113
110
114
111
/**
115
- * Reads a data from the input, if possible.
116
- * @return the data that was read; or {@code null}
112
+ * Read once from the input, if possible.
113
+ * @return the item that was read; or {@code null}
117
114
*/
118
115
@ Nullable
119
116
protected abstract T read () throws IOException ;
@@ -125,14 +122,17 @@ protected void suspendReading() {
125
122
}
126
123
127
124
125
+ // Private methods for use in State...
126
+
128
127
/**
129
- * Read and publish data from the input. Continue till there is no more
130
- * demand or there is no more data to be read.
131
- * @return {@code true} if there is more demand; {@code false} otherwise
128
+ * Read and publish data one at a time until there is no more data, no more
129
+ * demand, or perhaps we completed in the mean time.
130
+ * @return {@code true} if there is more demand; {@code false} if there is
131
+ * no more demand or we have completed.
132
132
*/
133
133
private boolean readAndPublish () throws IOException {
134
134
long r ;
135
- while ((r = demand ) > 0 && !publisherCompleted ) {
135
+ while ((r = this . demand ) > 0 && !this . state . get (). equals ( State . COMPLETED ) ) {
136
136
T data = read ();
137
137
if (data != null ) {
138
138
if (r != Long .MAX_VALUE ) {
@@ -152,96 +152,95 @@ private boolean changeState(State oldState, State newState) {
152
152
return this .state .compareAndSet (oldState , newState );
153
153
}
154
154
155
+ private Subscription createSubscription () {
156
+ return new ReadSubscription ();
157
+ }
155
158
156
- private static final class ReadSubscription implements Subscription {
157
159
158
- private final AbstractListenerReadPublisher <?> publisher ;
160
+ /**
161
+ * Subscription that delegates signals to State.
162
+ */
163
+ private final class ReadSubscription implements Subscription {
159
164
160
- public ReadSubscription (AbstractListenerReadPublisher <?> publisher ) {
161
- this .publisher = publisher ;
162
- }
163
165
164
166
@ Override
165
167
public final void request (long n ) {
166
- if (this . publisher . logger .isTraceEnabled ()) {
167
- this . publisher . logger .trace (state () + " request: " + n );
168
+ if (logger .isTraceEnabled ()) {
169
+ logger .trace (state + " request: " + n );
168
170
}
169
- state ().request (this . publisher , n );
171
+ state . get ().request (AbstractListenerReadPublisher . this , n );
170
172
}
171
173
172
174
@ Override
173
175
public final void cancel () {
174
- if (this . publisher . logger .isTraceEnabled ()) {
175
- this . publisher . logger .trace (state () + " cancel" );
176
+ if (logger .isTraceEnabled ()) {
177
+ logger .trace (state + " cancel" );
176
178
}
177
- state ().cancel (this .publisher );
178
- }
179
-
180
- private State state () {
181
- return this .publisher .state .get ();
179
+ state .get ().cancel (AbstractListenerReadPublisher .this );
182
180
}
183
181
}
184
182
185
183
186
184
/**
187
- * Represents a state for the {@link Publisher} to be in. The following figure
188
- * indicate the four different states that exist, and the relationships between them.
189
- *
190
- * <pre>
191
- * UNSUBSCRIBED
192
- * |
193
- * v
194
- * NO_DEMAND -------------------> DEMAND
195
- * | ^ ^ |
196
- * | | | |
197
- * | --------- READING <----- |
198
- * | | |
199
- * | v |
200
- * ------------> COMPLETED <---------
185
+ * Represents a state for the {@link Publisher} to be in.
186
+ * <p><pre>
187
+ * UNSUBSCRIBED
188
+ * |
189
+ * v
190
+ * SUBSCRIBING
191
+ * |
192
+ * v
193
+ * +---- NO_DEMAND ---------------> DEMAND ---+
194
+ * | ^ ^ |
195
+ * | | | |
196
+ * | +------- READING <--------+ |
197
+ * | | |
198
+ * | v |
199
+ * +--------------> COMPLETED <---------------+
201
200
* </pre>
202
- * Refer to the individual states for more information.
203
201
*/
204
202
private enum State {
205
203
206
- /**
207
- * The initial unsubscribed state. Will respond to {@link
208
- * #subscribe(AbstractListenerReadPublisher, Subscriber)} by
209
- * changing state to {@link #NO_DEMAND}.
210
- */
211
204
UNSUBSCRIBED {
212
205
@ Override
213
206
<T > void subscribe (AbstractListenerReadPublisher <T > publisher , Subscriber <? super T > subscriber ) {
214
207
Assert .notNull (publisher , "Publisher must not be null" );
215
208
Assert .notNull (subscriber , "Subscriber must not be null" );
216
209
if (publisher .changeState (this , SUBSCRIBING )) {
217
- Subscription subscription = new ReadSubscription ( publisher );
210
+ Subscription subscription = publisher . createSubscription ( );
218
211
publisher .subscriber = subscriber ;
219
212
subscriber .onSubscribe (subscription );
220
213
publisher .changeState (SUBSCRIBING , NO_DEMAND );
221
- if (publisher .publisherCompleted ) {
222
- publisher .onAllDataRead ();
214
+ // Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
215
+ if (publisher .completionBeforeDemand ) {
216
+ publisher .state .get ().onAllDataRead (publisher );
223
217
}
224
- Throwable publisherError = publisher .publisherError ;
225
- if (publisherError != null ) {
226
- publisher .onError (publisherError );
218
+ Throwable ex = publisher .errorBeforeDemand ;
219
+ if (ex != null ) {
220
+ publisher .state . get (). onError (publisher , ex );
227
221
}
228
222
}
229
223
else {
230
- throw new IllegalStateException (toString ());
224
+ throw new IllegalStateException ("Failed to transition to SUBSCRIBING, " +
225
+ "subscriber: " + subscriber );
231
226
}
232
227
}
233
228
234
229
@ Override
235
230
<T > void onAllDataRead (AbstractListenerReadPublisher <T > publisher ) {
236
- publisher .publisherCompleted = true ;
231
+ publisher .completionBeforeDemand = true ;
237
232
}
238
233
239
234
@ Override
240
- <T > void onError (AbstractListenerReadPublisher <T > publisher , Throwable t ) {
241
- publisher .publisherError = t ;
235
+ <T > void onError (AbstractListenerReadPublisher <T > publisher , Throwable ex ) {
236
+ publisher .errorBeforeDemand = ex ;
242
237
}
243
238
},
244
239
240
+ /**
241
+ * Very brief state where we know we have a Subscriber but must not
242
+ * send onComplete and onError until we after onSubscribe.
243
+ */
245
244
SUBSCRIBING {
246
245
<T > void request (AbstractListenerReadPublisher <T > publisher , long n ) {
247
246
if (Operators .validate (n )) {
@@ -254,21 +253,15 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
254
253
255
254
@ Override
256
255
<T > void onAllDataRead (AbstractListenerReadPublisher <T > publisher ) {
257
- publisher .publisherCompleted = true ;
256
+ publisher .completionBeforeDemand = true ;
258
257
}
259
258
260
259
@ Override
261
- <T > void onError (AbstractListenerReadPublisher <T > publisher , Throwable t ) {
262
- publisher .publisherError = t ;
260
+ <T > void onError (AbstractListenerReadPublisher <T > publisher , Throwable ex ) {
261
+ publisher .errorBeforeDemand = ex ;
263
262
}
264
263
},
265
264
266
- /**
267
- * State that gets entered when there is no demand. Responds to {@link
268
- * #request(AbstractListenerReadPublisher, long)} by increasing the demand,
269
- * changing state to {@link #DEMAND} and will check whether there
270
- * is data available for reading.
271
- */
272
265
NO_DEMAND {
273
266
@ Override
274
267
<T > void request (AbstractListenerReadPublisher <T > publisher , long n ) {
@@ -277,21 +270,17 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
277
270
if (publisher .changeState (this , DEMAND )) {
278
271
publisher .checkOnDataAvailable ();
279
272
}
273
+ // or else we completed at the same time...
280
274
}
281
275
}
282
276
},
283
277
284
- /**
285
- * State that gets entered when there is demand. Responds to
286
- * {@link #onDataAvailable(AbstractListenerReadPublisher)} by
287
- * reading the available data. The state will be changed to
288
- * {@link #NO_DEMAND} if there is no demand.
289
- */
290
278
DEMAND {
291
279
@ Override
292
280
<T > void request (AbstractListenerReadPublisher <T > publisher , long n ) {
293
281
if (Operators .validate (n )) {
294
282
Operators .addCap (DEMAND_FIELD_UPDATER , publisher , n );
283
+ // Did a concurrent read transition to NO_DEMAND just before us?
295
284
if (publisher .changeState (NO_DEMAND , DEMAND )) {
296
285
publisher .checkOnDataAvailable ();
297
286
}
@@ -304,32 +293,38 @@ <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
304
293
if (!read (publisher )) {
305
294
return ;
306
295
}
296
+ // Maybe demand arrived between readAndPublish and READING->NO_DEMAND?
307
297
long r = publisher .demand ;
308
298
if (r == 0 || publisher .changeState (NO_DEMAND , this )) {
309
299
break ;
310
300
}
311
301
}
312
302
}
313
303
304
+ /**
305
+ * @return whether to exit the read loop; false means stop trying
306
+ * to read, true means check demand one more time.
307
+ */
314
308
<T > boolean read (AbstractListenerReadPublisher <T > publisher ) {
315
309
if (publisher .changeState (this , READING )) {
316
310
try {
317
311
boolean demandAvailable = publisher .readAndPublish ();
318
312
if (demandAvailable ) {
319
313
if (publisher .changeState (READING , DEMAND )) {
320
314
publisher .checkOnDataAvailable ();
321
- return false ;
322
315
}
323
316
}
324
317
else if (publisher .changeState (READING , NO_DEMAND )) {
325
318
publisher .suspendReading ();
319
+ return true ;
326
320
}
327
321
}
328
322
catch (IOException ex ) {
329
323
publisher .onError (ex );
330
324
}
331
- return true ;
332
325
}
326
+ // Either competing onDataAvailable calls (via request or container callback)
327
+ // Or a concurrent completion
333
328
return false ;
334
329
}
335
330
},
@@ -339,16 +334,14 @@ else if (publisher.changeState(READING, NO_DEMAND)) {
339
334
<T > void request (AbstractListenerReadPublisher <T > publisher , long n ) {
340
335
if (Operators .validate (n )) {
341
336
Operators .addCap (DEMAND_FIELD_UPDATER , publisher , n );
337
+ // Did a concurrent read transition to NO_DEMAND just before us?
342
338
if (publisher .changeState (NO_DEMAND , DEMAND )) {
343
339
publisher .checkOnDataAvailable ();
344
340
}
345
341
}
346
342
}
347
343
},
348
344
349
- /**
350
- * The terminal completed state. Does not respond to any events.
351
- */
352
345
COMPLETED {
353
346
@ Override
354
347
<T > void request (AbstractListenerReadPublisher <T > publisher , long n ) {
@@ -377,10 +370,7 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
377
370
}
378
371
379
372
<T > void cancel (AbstractListenerReadPublisher <T > publisher ) {
380
- if (publisher .changeState (this , COMPLETED )) {
381
- publisher .publisherCompleted = true ;
382
- }
383
- else {
373
+ if (!publisher .changeState (this , COMPLETED )) {
384
374
publisher .state .get ().cancel (publisher );
385
375
}
386
376
}
@@ -391,7 +381,6 @@ <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
391
381
392
382
<T > void onAllDataRead (AbstractListenerReadPublisher <T > publisher ) {
393
383
if (publisher .changeState (this , COMPLETED )) {
394
- publisher .publisherCompleted = true ;
395
384
if (publisher .subscriber != null ) {
396
385
publisher .subscriber .onComplete ();
397
386
}
0 commit comments