@@ -194,7 +194,7 @@ private void handleNewSource(Observable<? extends T> t) {
194194 }
195195 MergeProducer <T > producerIfNeeded = null ;
196196 // if we have received a request then we need to respect it, otherwise we fast-path
197- if (mergeProducer .requested () != Long .MAX_VALUE ) {
197+ if (mergeProducer .requested != Long .MAX_VALUE ) {
198198 /**
199199 * <pre> {@code
200200 * With this optimization:
@@ -237,7 +237,7 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? ext
237237 * } </pre>
238238 *
239239 */
240- if (mergeProducer .requested () == Long .MAX_VALUE ) {
240+ if (mergeProducer .requested == Long .MAX_VALUE ) {
241241 handleScalarSynchronousObservableWithoutRequestLimits (t );
242242 } else {
243243 handleScalarSynchronousObservableWithRequestLimits (t );
@@ -274,11 +274,11 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
274274 boolean moreToDrain ;
275275 boolean isReturn = false ;
276276 try {
277- long r = mergeProducer .requested () ;
277+ long r = mergeProducer .requested ;
278278 if (r > 0 ) {
279279 emitted = true ;
280280 actual .onNext (t .get ());
281- mergeProducer . getAndAdd (- 1 );
281+ MergeProducer . REQUESTED . decrementAndGet ( mergeProducer );
282282 // we handle this Observable without ever incrementing the wip or touching other machinery so just return here
283283 isReturn = true ;
284284 }
@@ -376,7 +376,7 @@ private void drainChildrenQueues() {
376376 private int drainScalarValueQueue () {
377377 RxRingBuffer svq = scalarValueQueue ;
378378 if (svq != null ) {
379- long r = mergeProducer .requested () ;
379+ long r = mergeProducer .requested ;
380380 int emittedWhileDraining = 0 ;
381381 if (r < 0 ) {
382382 // drain it all
@@ -398,7 +398,7 @@ private int drainScalarValueQueue() {
398398 }
399399 }
400400 // decrement the number we emitted from outstanding requests
401- mergeProducer . getAndAdd (-emittedWhileDraining );
401+ MergeProducer . REQUESTED . getAndAdd (mergeProducer , -emittedWhileDraining );
402402 }
403403 return emittedWhileDraining ;
404404 }
@@ -410,7 +410,7 @@ private int drainScalarValueQueue() {
410410 @ Override
411411 public Boolean call (InnerSubscriber <T > s ) {
412412 if (s .q != null ) {
413- long r = mergeProducer .requested () ;
413+ long r = mergeProducer .requested ;
414414 int emitted = s .drainQueue ();
415415 if (emitted > 0 ) {
416416 s .requestMore (emitted );
@@ -533,26 +533,19 @@ public MergeProducer(MergeSubscriber<T> ms) {
533533 this .ms = ms ;
534534 }
535535
536- private volatile long rq = 0 ;
536+ private volatile long requested = 0 ;
537537 @ SuppressWarnings ("rawtypes" )
538- static final AtomicLongFieldUpdater <MergeProducer > RQ = AtomicLongFieldUpdater .newUpdater (MergeProducer .class , "rq " );
538+ static final AtomicLongFieldUpdater <MergeProducer > REQUESTED = AtomicLongFieldUpdater .newUpdater (MergeProducer .class , "requested " );
539539
540- public long requested () {
541- return rq ;
542- }
543- public long getAndAdd (long n ) {
544- return RQ .getAndAdd (this , n );
545- }
546-
547540 @ Override
548541 public void request (long n ) {
549- if (rq == Long .MAX_VALUE ) {
542+ if (requested == Long .MAX_VALUE ) {
550543 return ;
551544 }
552545 if (n == Long .MAX_VALUE ) {
553- rq = Long .MAX_VALUE ;
546+ requested = Long .MAX_VALUE ;
554547 } else {
555- BackpressureUtils .getAndAddRequest (RQ , this , n );
548+ BackpressureUtils .getAndAddRequest (REQUESTED , this , n );
556549 if (ms .drainQueuesIfNeeded ()) {
557550 boolean sendComplete = false ;
558551 synchronized (ms ) {
@@ -675,7 +668,7 @@ private void emit(T t, boolean complete) {
675668 } else {
676669 // this needs to check q.count() as draining above may not have drained the full queue
677670 // perf tests show this to be okay, though different queue implementations could perform poorly with this
678- if (producer .requested () > 0 && q .count () == 0 ) {
671+ if (producer .requested > 0 && q .count () == 0 ) {
679672 if (complete ) {
680673 parentSubscriber .completeInner (this );
681674 } else {
@@ -686,7 +679,7 @@ private void emit(T t, boolean complete) {
686679 onError (OnErrorThrowable .addValueAsLastCause (e , t ));
687680 }
688681 emitted ++;
689- producer . getAndAdd (- 1 );
682+ MergeProducer . REQUESTED . decrementAndGet ( producer );
690683 }
691684 } else {
692685 // no requests available, so enqueue it
@@ -735,7 +728,7 @@ private void enqueue(T t, boolean complete) {
735728 private int drainRequested () {
736729 int emitted = 0 ;
737730 // drain what was requested
738- long toEmit = producer .requested () ;
731+ long toEmit = producer .requested ;
739732 Object o ;
740733 for (int i = 0 ; i < toEmit ; i ++) {
741734 o = q .poll ();
@@ -757,7 +750,7 @@ private int drainRequested() {
757750 }
758751
759752 // decrement the number we emitted from outstanding requests
760- producer . getAndAdd (-emitted );
753+ MergeProducer . REQUESTED . getAndAdd (producer , -emitted );
761754 return emitted ;
762755 }
763756
0 commit comments