File tree Expand file tree Collapse file tree 2 files changed +42
-1
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +42
-1
lines changed Original file line number Diff line number Diff line change @@ -545,7 +545,16 @@ public void request(long n) {
545545 if (n == Long .MAX_VALUE ) {
546546 requested = Long .MAX_VALUE ;
547547 } else {
548- REQUESTED .getAndAdd (this , n );
548+ // add n to requested but check for overflow
549+ while (true ) {
550+ long current = REQUESTED .get (this );
551+ long next = current + n ;
552+ //check for overflow
553+ if (next < 0 )
554+ next = Long .MAX_VALUE ;
555+ if (REQUESTED .compareAndSet (this , current , next ))
556+ break ;
557+ }
549558 if (ms .drainQueuesIfNeeded ()) {
550559 boolean sendComplete = false ;
551560 synchronized (ms ) {
Original file line number Diff line number Diff line change @@ -1183,6 +1183,38 @@ public void call() {
11831183 assertTrue (a );
11841184 //}
11851185 }
1186+
1187+ @ Test
1188+ public void testMergeRequestOverflow () throws InterruptedException {
1189+ //do a non-trivial merge so that future optimisations with EMPTY don't invalidate this test
1190+ Observable <Integer > o = Observable .from (Arrays .asList (1 ,2 )).mergeWith (Observable .from (Arrays .asList (3 ,4 )));
1191+ final int expectedCount = 4 ;
1192+ final CountDownLatch latch = new CountDownLatch (expectedCount );
1193+ o .subscribeOn (Schedulers .computation ()).subscribe (new Subscriber <Integer >() {
1194+
1195+ @ Override
1196+ public void onStart () {
1197+ request (1 );
1198+ }
1199+
1200+ @ Override
1201+ public void onCompleted () {
1202+ //ignore
1203+ }
1204+
1205+ @ Override
1206+ public void onError (Throwable e ) {
1207+ throw new RuntimeException (e );
1208+ }
1209+
1210+ @ Override
1211+ public void onNext (Integer t ) {
1212+ latch .countDown ();
1213+ request (2 );
1214+ request (Long .MAX_VALUE -1 );
1215+ }});
1216+ assertTrue (latch .await (10 , TimeUnit .SECONDS ));
1217+ }
11861218
11871219 private static Action1 <Integer > printCount () {
11881220 return new Action1 <Integer >() {
You can’t perform that action at this time.
0 commit comments