@@ -182,62 +182,42 @@ protected void schedule() {
182182 void pollQueue () {
183183 int emitted = 0 ;
184184 do {
185- /*
186- * Set to 1 otherwise it could have grown very large while in the last poll loop
187- * and then we can end up looping all those times again here before exiting even once we've drained
188- */
189185 counter = 1 ;
190-
191- while (!scheduledUnsubscribe .isUnsubscribed ()) {
186+ long produced = 0 ;
187+ long r = requested ;
188+ while (!child .isUnsubscribed ()) {
189+ Throwable error ;
192190 if (finished ) {
193- // only read volatile error once
194- Throwable err = error ;
195- if (err != null ) {
196- // clear the queue to enable gc
191+ if ((error = this .error ) != null ) {
192+ // errors shortcut the queue so
193+ // release the elements in the queue for gc
197194 queue .clear ();
198- // even if there are onNext in the queue we eagerly notify of error
199- child .onError (err );
195+ child .onError (error );
200196 return ;
201- } else if (queue .isEmpty ()) {
197+ } else
198+ if (queue .isEmpty ()) {
202199 child .onCompleted ();
203200 return ;
204201 }
205202 }
206- if (REQUESTED . getAndDecrement ( this ) != 0 ) {
203+ if (r > 0 ) {
207204 Object o = queue .poll ();
208- if (o == null ) {
209- // nothing in queue (but be careful, something could be added concurrently right now)
210- if (finished ) {
211- // only read volatile error once
212- Throwable err = error ;
213- if (err != null ) {
214- // clear the queue to enable gc
215- queue .clear ();
216- // even if there are onNext in the queue we eagerly notify of error
217- child .onError (err );
218- return ;
219- } else if (queue .isEmpty ()) {
220- child .onCompleted ();
221- return ;
222- }
223- }
224- BackpressureUtils .getAndAddRequest (REQUESTED , this , 1 );
225- break ;
205+ if (o != null ) {
206+ child .onNext (on .getValue (o ));
207+ r --;
208+ emitted ++;
209+ produced ++;
226210 } else {
227- if (!on .accept (child , o )) {
228- // non-terminal event so let's increment count
229- emitted ++;
230- }
211+ break ;
231212 }
232213 } else {
233- // we hit the end ... so increment back to 0 again
234- BackpressureUtils .getAndAddRequest (REQUESTED , this , 1 );
235214 break ;
236215 }
237216 }
217+ if (produced > 0 ) {
218+ REQUESTED .addAndGet (this , -produced );
219+ }
238220 } while (COUNTER_UPDATER .decrementAndGet (this ) > 0 );
239-
240- // request the number of items that we emitted in this poll loop
241221 if (emitted > 0 ) {
242222 request (emitted );
243223 }
0 commit comments