@@ -75,15 +75,19 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7575 final NotificationLite <T > on = NotificationLite .instance ();
7676
7777 final Queue <Object > queue ;
78- volatile boolean completed = false ;
79- volatile boolean failure = false ;
78+
79+ // the status of the current stream
80+ volatile boolean finished = false ;
8081
82+ @ SuppressWarnings ("unused" )
8183 volatile long requested = 0 ;
84+
8285 @ SuppressWarnings ("rawtypes" )
8386 static final AtomicLongFieldUpdater <ObserveOnSubscriber > REQUESTED = AtomicLongFieldUpdater .newUpdater (ObserveOnSubscriber .class , "requested" );
8487
8588 @ SuppressWarnings ("unused" )
8689 volatile long counter ;
90+
8791 @ SuppressWarnings ("rawtypes" )
8892 static final AtomicLongFieldUpdater <ObserveOnSubscriber > COUNTER_UPDATER = AtomicLongFieldUpdater .newUpdater (ObserveOnSubscriber .class , "counter" );
8993
@@ -127,7 +131,7 @@ public void onStart() {
127131
128132 @ Override
129133 public void onNext (final T t ) {
130- if (isUnsubscribed () || completed ) {
134+ if (isUnsubscribed ()) {
131135 return ;
132136 }
133137 if (!queue .offer (on .next (t ))) {
@@ -139,30 +143,23 @@ public void onNext(final T t) {
139143
140144 @ Override
141145 public void onCompleted () {
142- if (isUnsubscribed () || completed ) {
146+ if (isUnsubscribed () || finished ) {
143147 return ;
144148 }
145- if (error != null ) {
146- return ;
147- }
148- completed = true ;
149+ finished = true ;
149150 schedule ();
150151 }
151152
152153 @ Override
153154 public void onError (final Throwable e ) {
154- if (isUnsubscribed () || completed ) {
155- return ;
156- }
157- if (error != null ) {
155+ if (isUnsubscribed () || finished ) {
158156 return ;
159157 }
160158 error = e ;
161159 // unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
162160 unsubscribe ();
163- // mark failure so the polling thread will skip onNext still in the queue
164- completed = true ;
165- failure = true ;
161+ finished = true ;
162+ // polling thread should skip any onNext still in the queue
166163 schedule ();
167164 }
168165
@@ -191,41 +188,51 @@ void pollQueue() {
191188 */
192189 counter = 1 ;
193190
194- // middle:
195191 while (!scheduledUnsubscribe .isUnsubscribed ()) {
196- if (failure ) {
197- child .onError (error );
198- return ;
199- } else {
200- if (requested == 0 && completed && queue .isEmpty ()) {
192+ if (finished ) {
193+ // only read volatile error once
194+ Throwable err = error ;
195+ if (err != null ) {
196+ // clear the queue to enable gc
197+ queue .clear ();
198+ // even if there are onNext in the queue we eagerly notify of error
199+ child .onError (err );
200+ return ;
201+ } else if (queue .isEmpty ()) {
201202 child .onCompleted ();
202203 return ;
203204 }
204- if (REQUESTED .getAndDecrement (this ) != 0 ) {
205- Object o = queue .poll ();
206- if (o == null ) {
207- if (completed ) {
208- if (failure ) {
209- child .onError (error );
210- } else {
211- child .onCompleted ();
212- }
205+ }
206+ if (REQUESTED .getAndDecrement (this ) != 0 ) {
207+ Object o = queue .poll ();
208+ if (o == null ) {
209+ // nothing in queue (but be careful, something could be added concurrently right now)
210+ if (finished ) {
211+ // only read volatile error once
212+ Throwable err = error ;
213+ if (err != null ) {
214+ // clear the queue to enable gc
215+ queue .clear ();
216+ // even if there are onNext in the queue we eagerly notify of error
217+ child .onError (err );
218+ return ;
219+ } else if (queue .isEmpty ()) {
220+ child .onCompleted ();
213221 return ;
214- }
215- // nothing in queue
216- REQUESTED .incrementAndGet (this );
217- break ;
218- } else {
219- if (!on .accept (child , o )) {
220- // non-terminal event so let's increment count
221- emitted ++;
222222 }
223223 }
224- } else {
225- // we hit the end ... so increment back to 0 again
226- REQUESTED .incrementAndGet (this );
224+ BackpressureUtils .getAndAddRequest (REQUESTED , this , 1 );
227225 break ;
226+ } else {
227+ if (!on .accept (child , o )) {
228+ // non-terminal event so let's increment count
229+ emitted ++;
230+ }
228231 }
232+ } else {
233+ // we hit the end ... so increment back to 0 again
234+ BackpressureUtils .getAndAddRequest (REQUESTED , this , 1 );
235+ break ;
229236 }
230237 }
231238 } while (COUNTER_UPDATER .decrementAndGet (this ) > 0 );
0 commit comments