Skip to content

Commit 7a9ae10

Browse files
committed
Range concurrency & off-by-one bugfix
1 parent cfaae57 commit 7a9ae10

File tree

2 files changed

+70
-9
lines changed

2 files changed

+70
-9
lines changed

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeRange.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,13 @@ private static final class RangeProducer implements Producer {
4545
// accessed by REQUESTED_UPDATER
4646
private volatile int requested;
4747
private static final AtomicIntegerFieldUpdater<RangeProducer> REQUESTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(RangeProducer.class, "requested");
48-
private volatile int index;
48+
private int index;
4949
private final int end;
50-
private final int start;
5150

5251
private RangeProducer(Subscriber<? super Integer> o, int start, int end) {
5352
this.o = o;
5453
this.index = start;
5554
this.end = end;
56-
this.start = start;
5755
}
5856

5957
@Override
@@ -76,17 +74,20 @@ public void request(int n) {
7674
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
7775
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
7876
*/
79-
int numLeft = start + (end - index);
80-
int e = Math.min(numLeft, requested);
81-
boolean completeOnFinish = numLeft < requested;
82-
int stopAt = e + index;
83-
for (int i = index; i < stopAt; i++) {
77+
int r = requested;
78+
int idx = index;
79+
int numLeft = end - idx + 1;
80+
int e = Math.min(numLeft, r);
81+
boolean completeOnFinish = numLeft <= r;
82+
int stopAt = e + idx;
83+
for (int i = idx; i < stopAt; i++) {
8484
if (o.isUnsubscribed()) {
8585
return;
8686
}
8787
o.onNext(i);
8888
}
89-
index += e;
89+
index = stopAt;
90+
9091
if (completeOnFinish) {
9192
o.onCompleted();
9293
return;

rxjava-core/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.ArrayList;
2626
import java.util.Arrays;
2727
import java.util.Collections;
28+
import java.util.List;
2829
import java.util.concurrent.atomic.AtomicInteger;
2930

3031
import org.junit.Test;
@@ -130,4 +131,63 @@ public void testNoBackpressure() {
130131
ts.assertReceivedOnNext(list);
131132
ts.assertTerminalEvent();
132133
}
134+
void testWithBackpressureOneByOne(int start) {
135+
Observable<Integer> source = Observable.range(start, 100);
136+
137+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
138+
ts.request(1);
139+
source.subscribe(ts);
140+
141+
List<Integer> list = new ArrayList<Integer>(100);
142+
for (int i = 0; i < 100; i++) {
143+
list.add(i + start);
144+
ts.request(1);
145+
}
146+
ts.assertReceivedOnNext(list);
147+
ts.assertTerminalEvent();
148+
}
149+
void testWithBackpressureAllAtOnce(int start) {
150+
Observable<Integer> source = Observable.range(start, 100);
151+
152+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
153+
ts.request(100);
154+
source.subscribe(ts);
155+
156+
List<Integer> list = new ArrayList<Integer>(100);
157+
for (int i = 0; i < 100; i++) {
158+
list.add(i + start);
159+
}
160+
ts.assertReceivedOnNext(list);
161+
ts.assertTerminalEvent();
162+
}
163+
@Test
164+
public void testWithBackpressure1() {
165+
for (int i = 0; i < 100; i++) {
166+
testWithBackpressureOneByOne(i);
167+
}
168+
}
169+
@Test
170+
public void testWithBackpressureAllAtOnce() {
171+
for (int i = 0; i < 100; i++) {
172+
testWithBackpressureAllAtOnce(i);
173+
}
174+
}
175+
@Test
176+
public void testWithBackpressureRequestWayMore() {
177+
Observable<Integer> source = Observable.range(50, 100);
178+
179+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
180+
ts.request(150);
181+
source.subscribe(ts);
182+
183+
List<Integer> list = new ArrayList<Integer>(100);
184+
for (int i = 0; i < 100; i++) {
185+
list.add(i + 50);
186+
}
187+
188+
ts.request(50); // and then some
189+
190+
ts.assertReceivedOnNext(list);
191+
ts.assertTerminalEvent();
192+
}
133193
}

0 commit comments

Comments
 (0)