Skip to content

Commit d8f9e86

Browse files
committed
fix skip race conditions and request overflow
1 parent 96c903e commit d8f9e86

File tree

2 files changed

+43
-14
lines changed

2 files changed

+43
-14
lines changed

src/main/java/rx/internal/operators/OperatorSkip.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
1820
import rx.Observable;
1921
import rx.Producer;
2022
import rx.Subscriber;
@@ -63,19 +65,8 @@ public void onNext(T t) {
6365

6466
@Override
6567
public void setProducer(final Producer producer) {
66-
child.setProducer(new Producer() {
67-
68-
@Override
69-
public void request(long n) {
70-
if (n == Long.MAX_VALUE) {
71-
// infinite so leave it alone
72-
producer.request(n);
73-
} else if (n > 0) {
74-
// add the skip num to the requested amount, since we'll skip everything and then emit to the buffer downstream
75-
producer.request(n + (toSkip - skipped));
76-
}
77-
}
78-
});
68+
child.setProducer(producer);
69+
producer.request(toSkip);
7970
}
8071

8172
};

src/test/java/rx/internal/operators/OperatorSkipTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,23 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Mockito.mock;
2021
import static org.mockito.Mockito.never;
2122
import static org.mockito.Mockito.times;
2223
import static org.mockito.Mockito.verify;
2324

25+
import java.util.Arrays;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicLong;
28+
2429
import org.junit.Test;
2530

2631
import rx.Observable;
2732
import rx.Observer;
28-
import rx.internal.operators.OperatorSkip;
33+
import rx.functions.Action1;
34+
import rx.observers.TestSubscriber;
2935

3036
public class OperatorSkipTest {
3137

@@ -144,4 +150,36 @@ public void testSkipError() {
144150
verify(observer, never()).onCompleted();
145151

146152
}
153+
154+
@Test
155+
public void testBackpressureMultipleSmallAsyncRequests() throws InterruptedException {
156+
final AtomicLong requests = new AtomicLong(0);
157+
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
158+
Observable.interval(100, TimeUnit.MILLISECONDS)
159+
.doOnRequest(new Action1<Long>() {
160+
@Override
161+
public void call(Long n) {
162+
requests.addAndGet(n);
163+
}
164+
}).skip(4).subscribe(ts);
165+
Thread.sleep(100);
166+
ts.requestMore(1);
167+
ts.requestMore(1);
168+
Thread.sleep(100);
169+
ts.unsubscribe();
170+
ts.assertUnsubscribed();
171+
ts.assertNoErrors();
172+
assertEquals(6, requests.get());
173+
}
174+
175+
@Test
176+
public void testRequestOverflowDoesNotOccur() {
177+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(Long.MAX_VALUE-1);
178+
Observable.range(1, 10).skip(5).subscribe(ts);
179+
ts.assertTerminalEvent();
180+
ts.assertCompleted();
181+
ts.assertNoErrors();
182+
assertEquals(Arrays.asList(6,7,8,9,10), ts.getOnNextEvents());
183+
}
184+
147185
}

0 commit comments

Comments
 (0)