Skip to content

Commit 7199792

Browse files
committed
onStart requests should be additive (and check for overflow)
1 parent a82800c commit 7199792

File tree

2 files changed

+66
-4
lines changed

2 files changed

+66
-4
lines changed

src/main/java/rx/Subscriber.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,16 @@ protected final void request(long n) {
103103
synchronized (this) {
104104
if (p != null) {
105105
shouldRequest = p;
106-
} else {
106+
} else if (requested == Long.MIN_VALUE) {
107107
requested = n;
108+
} else {
109+
final long total = requested + n;
110+
// check if overflow occurred
111+
if (total < 0) {
112+
requested = Long.MAX_VALUE;
113+
} else {
114+
requested = total;
115+
}
108116
}
109117
}
110118
// after releasing lock

src/test/java/rx/SubscriberTest.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import static org.junit.Assert.assertEquals;
1919
import static org.junit.Assert.assertTrue;
2020

21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.List;
2124
import java.util.concurrent.CountDownLatch;
2225
import java.util.concurrent.TimeUnit;
2326
import java.util.concurrent.atomic.AtomicInteger;
@@ -343,7 +346,6 @@ public void onError(Throwable e) {
343346

344347
@Override
345348
public void onNext(Integer t) {
346-
System.out.println(t);
347349
request(1);
348350
}
349351

@@ -375,7 +377,6 @@ public void onError(Throwable e) {
375377

376378
@Override
377379
public void onNext(Integer t) {
378-
System.out.println(t);
379380
request(1);
380381
}
381382

@@ -411,7 +412,6 @@ public void onError(Throwable e) {
411412

412413
@Override
413414
public void onNext(Integer t) {
414-
System.out.println(t);
415415
child.onNext(t);
416416
request(1);
417417
}
@@ -454,4 +454,58 @@ public void onNext(Integer t) {
454454
assertTrue(latch.await(10, TimeUnit.SECONDS));
455455
assertTrue(exception.get() instanceof IllegalArgumentException);
456456
}
457+
458+
@Test
459+
public void testOnStartRequestsAreAdditive() {
460+
final List<Integer> list = new ArrayList<Integer>();
461+
Observable.just(1,2,3,4,5).subscribe(new Subscriber<Integer>() {
462+
@Override
463+
public void onStart() {
464+
request(3);
465+
request(2);
466+
}
467+
468+
@Override
469+
public void onCompleted() {
470+
471+
}
472+
473+
@Override
474+
public void onError(Throwable e) {
475+
476+
}
477+
478+
@Override
479+
public void onNext(Integer t) {
480+
list.add(t);
481+
}});
482+
assertEquals(Arrays.asList(1,2,3,4,5), list);
483+
}
484+
485+
@Test
486+
public void testOnStartRequestsAreAdditiveAndOverflowBecomesMaxValue() {
487+
final List<Integer> list = new ArrayList<Integer>();
488+
Observable.just(1,2,3,4,5).subscribe(new Subscriber<Integer>() {
489+
@Override
490+
public void onStart() {
491+
request(2);
492+
request(Long.MAX_VALUE-1);
493+
}
494+
495+
@Override
496+
public void onCompleted() {
497+
498+
}
499+
500+
@Override
501+
public void onError(Throwable e) {
502+
503+
}
504+
505+
@Override
506+
public void onNext(Integer t) {
507+
list.add(t);
508+
}});
509+
assertEquals(Arrays.asList(1,2,3,4,5), list);
510+
}
457511
}

0 commit comments

Comments
 (0)