@@ -224,6 +224,23 @@ private void changeToDemandState(State oldState) {
224
224
}
225
225
}
226
226
227
+ private void handleCompletionOrErrorBeforeDemand () {
228
+ State state = this .state .get ();
229
+ if (!state .equals (State .UNSUBSCRIBED ) && !state .equals (State .SUBSCRIBING )) {
230
+ if (this .completionBeforeDemand ) {
231
+ rsReadLogger .trace (getLogPrefix () + "Completed before demand" );
232
+ this .state .get ().onAllDataRead (this );
233
+ }
234
+ Throwable ex = this .errorBeforeDemand ;
235
+ if (ex != null ) {
236
+ if (rsReadLogger .isTraceEnabled ()) {
237
+ rsReadLogger .trace (getLogPrefix () + "Completed with error before demand: " + ex );
238
+ }
239
+ this .state .get ().onError (this , ex );
240
+ }
241
+ }
242
+ }
243
+
227
244
private Subscription createSubscription () {
228
245
return new ReadSubscription ();
229
246
}
@@ -283,7 +300,7 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe
283
300
publisher .subscriber = subscriber ;
284
301
subscriber .onSubscribe (subscription );
285
302
publisher .changeState (SUBSCRIBING , NO_DEMAND );
286
- handleCompletionOrErrorBeforeDemand (publisher );
303
+ publisher . handleCompletionOrErrorBeforeDemand ();
287
304
}
288
305
else {
289
306
throw new IllegalStateException ("Failed to transition to SUBSCRIBING, " +
@@ -294,30 +311,13 @@ <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? supe
294
311
@ Override
295
312
<T > void onAllDataRead (AbstractListenerReadPublisher <T > publisher ) {
296
313
publisher .completionBeforeDemand = true ;
297
- handleCompletionOrErrorBeforeDemand (publisher );
314
+ publisher . handleCompletionOrErrorBeforeDemand ();
298
315
}
299
316
300
317
@ Override
301
318
<T > void onError (AbstractListenerReadPublisher <T > publisher , Throwable ex ) {
302
319
publisher .errorBeforeDemand = ex ;
303
- handleCompletionOrErrorBeforeDemand (publisher );
304
- }
305
-
306
- private <T > void handleCompletionOrErrorBeforeDemand (AbstractListenerReadPublisher <T > publisher ) {
307
- if (publisher .state .get ().equals (NO_DEMAND )) {
308
- if (publisher .completionBeforeDemand ) {
309
- rsReadLogger .trace (publisher .getLogPrefix () + "Completed before demand" );
310
- publisher .state .get ().onAllDataRead (publisher );
311
- }
312
- Throwable ex = publisher .errorBeforeDemand ;
313
- if (ex != null ) {
314
- if (rsReadLogger .isTraceEnabled ()) {
315
- String prefix = publisher .getLogPrefix ();
316
- rsReadLogger .trace (prefix + "Completed with error before demand: " + ex );
317
- }
318
- publisher .state .get ().onError (publisher , ex );
319
- }
320
- }
320
+ publisher .handleCompletionOrErrorBeforeDemand ();
321
321
}
322
322
},
323
323
@@ -337,11 +337,13 @@ <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
337
337
@ Override
338
338
<T > void onAllDataRead (AbstractListenerReadPublisher <T > publisher ) {
339
339
publisher .completionBeforeDemand = true ;
340
+ publisher .handleCompletionOrErrorBeforeDemand ();
340
341
}
341
342
342
343
@ Override
343
344
<T > void onError (AbstractListenerReadPublisher <T > publisher , Throwable ex ) {
344
345
publisher .errorBeforeDemand = ex ;
346
+ publisher .handleCompletionOrErrorBeforeDemand ();
345
347
}
346
348
},
347
349
0 commit comments