Skip to content

Commit e89a39f

Browse files
committed
Handle overflow
1 parent 676313a commit e89a39f

File tree

2 files changed

+124
-6
lines changed

2 files changed

+124
-6
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.Iterator;
2020
import java.util.LinkedList;
2121
import java.util.List;
22-
import java.util.concurrent.atomic.AtomicBoolean;
2322

2423
import rx.Observable;
2524
import rx.Observable.Operator;
@@ -73,9 +72,17 @@ public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
7372
@Override
7473
public void setProducer(final Producer producer) {
7574
child.setProducer(new Producer() {
75+
76+
private volatile boolean infinite = false;
77+
7678
@Override
7779
public void request(long n) {
78-
if (n == Long.MAX_VALUE) {
80+
if (infinite) {
81+
return;
82+
}
83+
if (n >= Long.MAX_VALUE / count) {
84+
// n == Long.MAX_VALUE or n * count >= Long.MAX_VALUE
85+
infinite = true;
7986
producer.request(Long.MAX_VALUE);
8087
} else {
8188
producer.request(n * count);
@@ -127,21 +134,42 @@ public void onCompleted() {
127134
public void setProducer(final Producer producer) {
128135
child.setProducer(new Producer() {
129136

130-
private final AtomicBoolean firstRequest = new AtomicBoolean(false);
137+
private volatile boolean firstRequest = true;
138+
private volatile boolean infinite = false;
139+
140+
private void requestInfinite() {
141+
infinite = true;
142+
producer.request(Long.MAX_VALUE);
143+
}
131144

132145
@Override
133146
public void request(long n) {
147+
if (infinite) {
148+
return;
149+
}
134150
if (n == Long.MAX_VALUE) {
135-
producer.request(Long.MAX_VALUE);
151+
requestInfinite();
152+
return;
136153
} else {
137-
if (firstRequest.compareAndSet(false, true)) {
154+
if (firstRequest) {
155+
firstRequest = false;
156+
if (n - 1 >= (Long.MAX_VALUE - count) / skip) {
157+
// count + skip * (n - 1) >= Long.MAX_VALUE
158+
requestInfinite();
159+
return;
160+
}
138161
// count = 5, skip = 2, n = 3
139162
// * * * * *
140163
// * * * * *
141164
// * * * * *
142165
// request = 5 + 2 * ( 3 - 1)
143166
producer.request(count + skip * (n - 1));
144167
} else {
168+
if (n >= Long.MAX_VALUE / skip) {
169+
// skip * n >= Long.MAX_VALUE
170+
requestInfinite();
171+
return;
172+
}
145173
// count = 5, skip = 2, n = 3
146174
// (* * *) * *
147175
// ( *) * * * *

rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,6 @@ public void request(long n) {
869869
assertEquals(6, requested.get());
870870
}
871871

872-
873872
@Test
874873
public void testProducerRequestThroughBufferWithSize4() {
875874
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
@@ -891,4 +890,95 @@ public void request(long n) {
891890
}).buffer(5, 2).subscribe(ts);
892891
assertEquals(Long.MAX_VALUE, requested.get());
893892
}
893+
894+
895+
@Test
896+
public void testProducerRequestOverflowThroughBufferWithSize1() {
897+
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
898+
ts.requestMore(Long.MAX_VALUE / 2);
899+
final AtomicLong requested = new AtomicLong();
900+
Observable.create(new Observable.OnSubscribe<Integer>() {
901+
902+
@Override
903+
public void call(Subscriber<? super Integer> s) {
904+
s.setProducer(new Producer() {
905+
906+
@Override
907+
public void request(long n) {
908+
requested.set(n);
909+
}
910+
911+
});
912+
}
913+
914+
}).buffer(3, 3).subscribe(ts);
915+
assertEquals(Long.MAX_VALUE, requested.get());
916+
}
917+
918+
@Test
919+
public void testProducerRequestOverflowThroughBufferWithSize2() {
920+
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
921+
ts.requestMore(Long.MAX_VALUE / 2);
922+
final AtomicLong requested = new AtomicLong();
923+
Observable.create(new Observable.OnSubscribe<Integer>() {
924+
925+
@Override
926+
public void call(Subscriber<? super Integer> s) {
927+
s.setProducer(new Producer() {
928+
929+
@Override
930+
public void request(long n) {
931+
requested.set(n);
932+
}
933+
934+
});
935+
}
936+
937+
}).buffer(3, 2).subscribe(ts);
938+
assertEquals(Long.MAX_VALUE, requested.get());
939+
}
940+
941+
@Test
942+
public void testProducerRequestOverflowThroughBufferWithSize3() {
943+
final AtomicLong requested = new AtomicLong();
944+
Observable.create(new Observable.OnSubscribe<Integer>() {
945+
946+
@Override
947+
public void call(final Subscriber<? super Integer> s) {
948+
s.setProducer(new Producer() {
949+
950+
@Override
951+
public void request(long n) {
952+
requested.set(n);
953+
s.onNext(1);
954+
s.onNext(2);
955+
s.onNext(3);
956+
}
957+
958+
});
959+
}
960+
961+
}).buffer(3, 2).subscribe(new Subscriber<List<Integer>>() {
962+
963+
@Override
964+
public void onStart() {
965+
request(Long.MAX_VALUE / 2 - 4);
966+
}
967+
968+
@Override
969+
public void onCompleted() {
970+
}
971+
972+
@Override
973+
public void onError(Throwable e) {
974+
}
975+
976+
@Override
977+
public void onNext(List<Integer> t) {
978+
request(Long.MAX_VALUE / 2);
979+
}
980+
981+
});
982+
assertEquals(Long.MAX_VALUE, requested.get());
983+
}
894984
}

0 commit comments

Comments
 (0)