File tree Expand file tree Collapse file tree 2 files changed +31
-1
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +31
-1
lines changed Original file line number Diff line number Diff line change @@ -55,7 +55,7 @@ public void request(long n) {
5555 if (n == Long .MAX_VALUE ) {
5656 _c = REQUESTED_UPDATER .getAndSet (this , Long .MAX_VALUE );
5757 } else {
58- _c = REQUESTED_UPDATER . getAndAdd ( this , n );
58+ _c = BackpressureUtils . getAndAddRequest ( REQUESTED_UPDATER , this , n );
5959 }
6060 if (!emittingStarted ) {
6161 // we haven't started yet, so record what was requested and return
Original file line number Diff line number Diff line change 2323import static org .mockito .Mockito .times ;
2424import static org .mockito .Mockito .verify ;
2525
26+ import java .util .ArrayList ;
2627import java .util .Arrays ;
28+ import java .util .List ;
2729import java .util .concurrent .atomic .AtomicInteger ;
2830
2931import org .junit .Test ;
@@ -293,4 +295,32 @@ public void onNext(Integer integer) {
293295 });
294296 assertEquals (1 ,count .get ());
295297 }
298+
299+ @ Test (timeout =10000 )
300+ public void testRequestOverflow () {
301+ final List <Integer > list = new ArrayList <Integer >();
302+ Observable .range (1 , 100 ).takeLast (50 ).subscribe (new Subscriber <Integer >() {
303+
304+ @ Override
305+ public void onStart () {
306+ request (2 );
307+ }
308+
309+ @ Override
310+ public void onCompleted () {
311+
312+ }
313+
314+ @ Override
315+ public void onError (Throwable e ) {
316+
317+ }
318+
319+ @ Override
320+ public void onNext (Integer t ) {
321+ list .add (t );
322+ request (Long .MAX_VALUE -1 );
323+ }});
324+ assertEquals (50 , list .size ());
325+ }
296326}
You can’t perform that action at this time.
0 commit comments