File tree Expand file tree Collapse file tree 1 file changed +4
-4
lines changed
rxjava-core/src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +4
-4
lines changed Original file line number Diff line number Diff line change @@ -185,7 +185,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
185
185
if (r > 0 ) {
186
186
emitted = true ;
187
187
actual .onNext (t .get ());
188
- mergeProducer .REQUESTED .decrementAndGet (mergeProducer );
188
+ MergeProducer .REQUESTED .decrementAndGet (mergeProducer );
189
189
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here
190
190
return ;
191
191
}
@@ -295,7 +295,7 @@ private int drainScalarValueQueue() {
295
295
}
296
296
}
297
297
// decrement the number we emitted from outstanding requests
298
- mergeProducer .REQUESTED .getAndAdd (mergeProducer , -emittedWhileDraining );
298
+ MergeProducer .REQUESTED .getAndAdd (mergeProducer , -emittedWhileDraining );
299
299
}
300
300
return emittedWhileDraining ;
301
301
}
@@ -500,7 +500,7 @@ private void emit(T t, boolean complete) {
500
500
} else {
501
501
parentSubscriber .actual .onNext (t );
502
502
emitted ++;
503
- producer .REQUESTED .decrementAndGet (producer );
503
+ MergeProducer .REQUESTED .decrementAndGet (producer );
504
504
}
505
505
} else {
506
506
// no requests available, so enqueue it
@@ -587,7 +587,7 @@ private int drainRequested() {
587
587
}
588
588
589
589
// decrement the number we emitted from outstanding requests
590
- producer .REQUESTED .getAndAdd (producer , -emitted );
590
+ MergeProducer .REQUESTED .getAndAdd (producer , -emitted );
591
591
return emitted ;
592
592
}
593
593
You can’t perform that action at this time.
0 commit comments