@@ -69,34 +69,27 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
69
69
70
70
@ Override
71
71
public void subscribe (Subscriber <? super T > subscriber ) {
72
- if (this .logger .isTraceEnabled ()) {
73
- this .logger .trace (this .state + " subscribe: " + subscriber );
74
- }
75
72
this .state .get ().subscribe (this , subscriber );
76
73
}
77
74
78
75
79
76
// Methods for sub-classes to delegate to, when async I/O events occur...
80
77
81
78
public final void onDataAvailable () {
82
- if (this .logger .isTraceEnabled ()) {
83
- this .logger .trace (this .state + " onDataAvailable" );
84
- }
79
+ this .logger .trace ("I/O event onDataAvailable" );
85
80
this .state .get ().onDataAvailable (this );
86
81
}
87
82
88
83
public void onAllDataRead () {
89
- if (this .logger .isTraceEnabled ()) {
90
- this .logger .trace (this .state + " onAllDataRead" );
91
- }
84
+ this .logger .trace ("I/O event onAllDataRead" );
92
85
this .state .get ().onAllDataRead (this );
93
86
}
94
87
95
- public final void onError (Throwable t ) {
88
+ public final void onError (Throwable ex ) {
96
89
if (this .logger .isTraceEnabled ()) {
97
- this .logger .trace (this . state + " onError: " + t );
90
+ this .logger .trace ("I/O event onError: " + ex );
98
91
}
99
- this .state .get ().onError (this , t );
92
+ this .state .get ().onError (this , ex );
100
93
}
101
94
102
95
@@ -142,18 +135,35 @@ private boolean readAndPublish() throws IOException {
142
135
if (r != Long .MAX_VALUE ) {
143
136
DEMAND_FIELD_UPDATER .addAndGet (this , -1L );
144
137
}
145
- Assert .state (this .subscriber != null , "No subscriber" );
146
- this .subscriber .onNext (data );
138
+ Subscriber <? super T > subscriber = this .subscriber ;
139
+ Assert .state (subscriber != null , "No subscriber" );
140
+ if (logger .isTraceEnabled ()) {
141
+ logger .trace ("Data item read, publishing.." );
142
+ }
143
+ subscriber .onNext (data );
147
144
}
148
145
else {
146
+ if (logger .isTraceEnabled ()) {
147
+ logger .trace ("No more data to read" );
148
+ }
149
149
return true ;
150
150
}
151
151
}
152
152
return false ;
153
153
}
154
154
155
155
private boolean changeState (State oldState , State newState ) {
156
- return this .state .compareAndSet (oldState , newState );
156
+ boolean result = this .state .compareAndSet (oldState , newState );
157
+ if (result && logger .isTraceEnabled ()) {
158
+ logger .trace (oldState + " -> " + newState );
159
+ }
160
+ return result ;
161
+ }
162
+
163
+ private void changeToDemandState (State oldState ) {
164
+ if (changeState (oldState , State .DEMAND )) {
165
+ checkOnDataAvailable ();
166
+ }
157
167
}
158
168
159
169
private Subscription createSubscription () {
@@ -170,15 +180,15 @@ private final class ReadSubscription implements Subscription {
170
180
@ Override
171
181
public final void request (long n ) {
172
182
if (logger .isTraceEnabled ()) {
173
- logger .trace (state + " request: " + n );
183
+ logger .trace ("Signal request( " + n + ")" );
174
184
}
175
185
state .get ().request (AbstractListenerReadPublisher .this , n );
176
186
}
177
187
178
188
@ Override
179
189
public final void cancel () {
180
190
if (logger .isTraceEnabled ()) {
181
- logger .trace (state + " cancel" );
191
+ logger .trace ("Signal cancel() " );
182
192
}
183
193
state .get ().cancel (AbstractListenerReadPublisher .this );
184
194
}
@@ -217,10 +227,14 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe
217
227
publisher .changeState (SUBSCRIBING , NO_DEMAND );
218
228
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
219
229
if (publisher .completionBeforeDemand ) {
230
+ publisher .logger .trace ("Completed before demand" );
220
231
publisher .state .get ().onAllDataRead (publisher );
221
232
}
222
233
Throwable ex = publisher .errorBeforeDemand ;
223
234
if (ex != null ) {
235
+ if (publisher .logger .isTraceEnabled ()) {
236
+ publisher .logger .trace ("Completed with error before demand: " + ex );
237
+ }
224
238
publisher .state .get ().onError (publisher , ex );
225
239
}
226
240
}
@@ -249,9 +263,7 @@ <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
249
263
<T > void request (AbstractListenerReadPublisher <T > publisher , long n ) {
250
264
if (Operators .validate (n )) {
251
265
Operators .addCap (DEMAND_FIELD_UPDATER , publisher , n );
252
- if (publisher .changeState (this , DEMAND )) {
253
- publisher .checkOnDataAvailable ();
254
- }
266
+ publisher .changeToDemandState (this );
255
267
}
256
268
}
257
269
@@ -271,10 +283,7 @@ <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
271
283
<T > void request (AbstractListenerReadPublisher <T > publisher , long n ) {
272
284
if (Operators .validate (n )) {
273
285
Operators .addCap (DEMAND_FIELD_UPDATER , publisher , n );
274
- if (publisher .changeState (this , DEMAND )) {
275
- publisher .checkOnDataAvailable ();
276
- }
277
- // or else we completed at the same time...
286
+ publisher .changeToDemandState (this );
278
287
}
279
288
}
280
289
},
@@ -285,9 +294,7 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
285
294
if (Operators .validate (n )) {
286
295
Operators .addCap (DEMAND_FIELD_UPDATER , publisher , n );
287
296
// Did a concurrent read transition to NO_DEMAND just before us?
288
- if (publisher .changeState (NO_DEMAND , this )) {
289
- publisher .checkOnDataAvailable ();
290
- }
297
+ publisher .changeToDemandState (NO_DEMAND );
291
298
}
292
299
}
293
300
@@ -297,17 +304,15 @@ <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
297
304
try {
298
305
boolean demandAvailable = publisher .readAndPublish ();
299
306
if (demandAvailable ) {
300
- if (publisher .changeState (READING , DEMAND )) {
301
- publisher .checkOnDataAvailable ();
302
- }
307
+ publisher .changeToDemandState (READING );
303
308
}
304
309
else {
305
310
publisher .readingPaused ();
306
311
if (publisher .changeState (READING , NO_DEMAND )) {
307
312
// Demand may have arrived since readAndPublish returned
308
313
long r = publisher .demand ;
309
- if (r > 0 && publisher . changeState ( NO_DEMAND , this ) ) {
310
- publisher .checkOnDataAvailable ( );
314
+ if (r > 0 ) {
315
+ publisher .changeToDemandState ( NO_DEMAND );
311
316
}
312
317
}
313
318
}
@@ -326,9 +331,7 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
326
331
if (Operators .validate (n )) {
327
332
Operators .addCap (DEMAND_FIELD_UPDATER , publisher , n );
328
333
// Did a concurrent read transition to NO_DEMAND just before us?
329
- if (publisher .changeState (NO_DEMAND , DEMAND )) {
330
- publisher .checkOnDataAvailable ();
331
- }
334
+ publisher .changeToDemandState (NO_DEMAND );
332
335
}
333
336
}
334
337
},
@@ -372,8 +375,9 @@ <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
372
375
373
376
<T > void onAllDataRead (AbstractListenerReadPublisher <T > publisher ) {
374
377
if (publisher .changeState (this , COMPLETED )) {
375
- if (publisher .subscriber != null ) {
376
- publisher .subscriber .onComplete ();
378
+ Subscriber <? super T > s = publisher .subscriber ;
379
+ if (s != null ) {
380
+ s .onComplete ();
377
381
}
378
382
}
379
383
else {
@@ -383,8 +387,9 @@ <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
383
387
384
388
<T > void onError (AbstractListenerReadPublisher <T > publisher , Throwable t ) {
385
389
if (publisher .changeState (this , COMPLETED )) {
386
- if (publisher .subscriber != null ) {
387
- publisher .subscriber .onError (t );
390
+ Subscriber <? super T > s = publisher .subscriber ;
391
+ if (s != null ) {
392
+ s .onError (t );
388
393
}
389
394
}
390
395
else {
0 commit comments