@@ -83,7 +83,8 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
8383 final NotificationLite <T > on ;
8484 final boolean delayError ;
8585 final Queue <Object > queue ;
86- final int bufferSize ;
86+ /** The emission threshold that should trigger a replenishing request. */
87+ final int limit ;
8788
8889 // the status of the current stream
8990 volatile boolean finished ;
@@ -97,6 +98,9 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
9798 * reading finished (acquire).
9899 */
99100 Throwable error ;
101+
102+ /** Remembers how many elements have been emitted before the requests run out. */
103+ long emitted ;
100104
101105 // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
102106 // not prevent anything downstream from consuming, which will happen if the Subscription is chained
@@ -105,12 +109,16 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boo
105109 this .recursiveScheduler = scheduler .createWorker ();
106110 this .delayError = delayError ;
107111 this .on = NotificationLite .instance ();
108- this .bufferSize = (bufferSize > 0 ) ? bufferSize : RxRingBuffer .SIZE ;
112+ int calculatedSize = (bufferSize > 0 ) ? bufferSize : RxRingBuffer .SIZE ;
113+ // this formula calculates the 75% of the bufferSize, rounded up to the next integer
114+ this .limit = calculatedSize - (calculatedSize >> 2 );
109115 if (UnsafeAccess .isUnsafeAvailable ()) {
110- queue = new SpscArrayQueue <Object >(this . bufferSize );
116+ queue = new SpscArrayQueue <Object >(calculatedSize );
111117 } else {
112- queue = new SpscAtomicArrayQueue <Object >(this . bufferSize );
118+ queue = new SpscAtomicArrayQueue <Object >(calculatedSize );
113119 }
120+ // signal that this is an async operator capable of receiving this many
121+ request (calculatedSize );
114122 }
115123
116124 void init () {
@@ -133,12 +141,6 @@ public void request(long n) {
133141 localChild .add (this );
134142 }
135143
136- @ Override
137- public void onStart () {
138- // signal that this is an async operator capable of receiving this many
139- request (this .bufferSize );
140- }
141-
142144 @ Override
143145 public void onNext (final T t ) {
144146 if (isUnsubscribed () || finished ) {
@@ -180,9 +182,8 @@ protected void schedule() {
180182 // only execute this from schedule()
181183 @ Override
182184 public void call () {
183- long emitted = 0L ;
184-
185185 long missed = 1L ;
186+ long currentEmission = emitted ;
186187
187188 // these are accessed in a tight loop around atomics so
188189 // loading them into local variables avoids the mandatory re-reading
@@ -197,7 +198,6 @@ public void call() {
197198
198199 for (;;) {
199200 long requestAmount = requested .get ();
200- long currentEmission = 0L ;
201201
202202 while (requestAmount != currentEmission ) {
203203 boolean done = finished ;
@@ -215,28 +215,25 @@ public void call() {
215215 localChild .onNext (localOn .getValue (v ));
216216
217217 currentEmission ++;
218- emitted ++;
218+ if (currentEmission == limit ) {
219+ requestAmount = BackpressureUtils .produced (requested , currentEmission );
220+ request (currentEmission );
221+ currentEmission = 0L ;
222+ }
219223 }
220224
221225 if (requestAmount == currentEmission ) {
222226 if (checkTerminated (finished , q .isEmpty (), localChild , q )) {
223227 return ;
224228 }
225229 }
226-
227- if (currentEmission != 0L ) {
228- BackpressureUtils .produced (requested , currentEmission );
229- }
230-
230+
231+ emitted = currentEmission ;
231232 missed = counter .addAndGet (-missed );
232233 if (missed == 0L ) {
233234 break ;
234235 }
235236 }
236-
237- if (emitted != 0L ) {
238- request (emitted );
239- }
240237 }
241238
242239 boolean checkTerminated (boolean done , boolean isEmpty , Subscriber <? super T > a , Queue <Object > q ) {
@@ -285,4 +282,4 @@ boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a,
285282 return false ;
286283 }
287284 }
288- }
285+ }
0 commit comments