Skip to content

Commit 65c4a85

Browse files
Test Repeat with SubscribeOn
1 parent e0f970c commit 65c4a85

File tree

1 file changed

+38
-0
lines changed

1 file changed

+38
-0
lines changed

rxjava-core/src/test/java/rx/operators/OperationRepeatTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
import org.junit.Test;
2323

2424
import rx.Observable;
25+
import rx.Observable.OnSubscribe;
2526
import rx.Observable.OnSubscribeFunc;
2627
import rx.Observer;
28+
import rx.Subscriber;
2729
import rx.Subscription;
2830
import rx.schedulers.Schedulers;
2931
import rx.subscriptions.Subscriptions;
32+
import rx.util.functions.Func1;
3033

3134
public class OperationRepeatTest {
3235

@@ -59,4 +62,39 @@ public void testNoStackOverFlow() {
5962
Observable.from(1).repeat(Schedulers.newThread()).take(100000).toBlockingObservable().last();
6063
}
6164

65+
@Test
66+
public void testRepeatTakeWithSubscribeOn() throws InterruptedException {
67+
68+
final AtomicInteger counter = new AtomicInteger();
69+
Observable<Integer> oi = Observable.create(new OnSubscribe<Integer>() {
70+
71+
@Override
72+
public void call(Subscriber<? super Integer> sub) {
73+
System.out.println("invoked!");
74+
counter.incrementAndGet();
75+
sub.onNext(1);
76+
sub.onNext(2);
77+
sub.onCompleted();
78+
}
79+
}).subscribeOn(Schedulers.newThread());
80+
81+
Object[] ys = oi.repeat(Schedulers.newThread()).map(new Func1<Integer, Integer>() {
82+
83+
@Override
84+
public Integer call(Integer t1) {
85+
System.out.println("t1: " + t1);
86+
try {
87+
Thread.sleep(50);
88+
} catch (InterruptedException e) {
89+
e.printStackTrace();
90+
}
91+
return t1;
92+
}
93+
94+
}).take(4).toList().toBlockingObservable().last().toArray();
95+
96+
assertEquals(2, counter.get());
97+
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
98+
}
99+
62100
}

0 commit comments

Comments
 (0)