Skip to content

Commit 1a7e51f

Browse files
Added test
1 parent 9a6f9cb commit 1a7e51f

File tree

3 files changed

+44
-24
lines changed

3 files changed

+44
-24
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,18 @@ public static Observable<Integer> range(int start, int count, Scheduler schedule
969969
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428(v=vs.103).aspx">MSDN: Observable.Repeat</a>
970970
*/
971971
public Observable<T> repeat() {
972-
return create(rx.operators.OperationRepeat.repeat(this));
972+
return this.repeat(Schedulers.currentThread());
973+
}
974+
975+
/**
976+
* Repeats the observable sequence indefinitely.
977+
* <p>
978+
* @param scheduler the scheduler to send the values on.
979+
* @return The observable sequence producing the elements of the given sequence repeatedly and sequentially.
980+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428(v=vs.103).aspx">MSDN: Observable.Repeat</a>
981+
*/
982+
public Observable<T> repeat(Scheduler scheduler) {
983+
return create(OperationRepeat.repeat(this, scheduler));
973984
}
974985

975986
/**

rxjava-core/src/main/java/rx/operators/OperationRepeat.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,36 @@
1818

1919
import rx.Observable;
2020
import rx.Observer;
21+
import rx.Scheduler;
2122
import rx.Subscription;
22-
import rx.concurrency.Schedulers;
23-
import rx.subscriptions.SerialSubscription;
23+
import rx.subscriptions.MultipleAssignmentSubscription;
2424
import rx.util.functions.Action0;
2525
import rx.util.functions.Action1;
2626

2727
public class OperationRepeat<T> implements Observable.OnSubscribeFunc<T> {
2828

2929
private final Observable<T> source;
30+
private final Scheduler scheduler;
3031

31-
public static <T> Observable.OnSubscribeFunc<T> repeat(Observable<T> seed) {
32-
return new OperationRepeat(seed);
32+
public static <T> Observable.OnSubscribeFunc<T> repeat(Observable<T> source, Scheduler scheduler) {
33+
return new OperationRepeat(source, scheduler);
3334
}
3435

35-
private OperationRepeat(Observable<T> source) {
36+
private OperationRepeat(Observable<T> source, Scheduler scheduler) {
3637
this.source = source;
38+
this.scheduler = scheduler;
3739
}
3840

3941
@Override
4042
public Subscription onSubscribe(final Observer<? super T> observer) {
41-
final SerialSubscription subscription = new SerialSubscription();
42-
subscription.setSubscription(Schedulers.currentThread().schedule(new Action1<Action0>() {
43+
final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
44+
subscription.setSubscription(scheduler.schedule(new Action1<Action0>() {
4345
@Override
4446
public void call(final Action0 self) {
4547
subscription.setSubscription(source.subscribe(new Observer<T>() {
4648

4749
@Override
4850
public void onCompleted() {
49-
subscription.getSubscription().unsubscribe();
5051
self.call();
5152
}
5253

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,36 @@
1515
*/
1616
package rx;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
21-
22-
import java.util.ArrayList;
23-
import java.util.Arrays;
24-
import java.util.LinkedList;
25-
import java.util.List;
26-
import java.util.concurrent.CountDownLatch;
27-
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.atomic.AtomicInteger;
29-
import java.util.concurrent.atomic.AtomicReference;
30-
3118
import org.junit.Before;
3219
import org.junit.Test;
3320
import org.mockito.InOrder;
3421
import org.mockito.Mock;
3522
import org.mockito.MockitoAnnotations;
36-
3723
import rx.Observable.OnSubscribeFunc;
24+
import rx.concurrency.Schedulers;
3825
import rx.concurrency.TestScheduler;
3926
import rx.observables.ConnectableObservable;
4027
import rx.subscriptions.BooleanSubscription;
4128
import rx.subscriptions.Subscriptions;
42-
import rx.util.functions.Action0;
4329
import rx.util.functions.Action1;
4430
import rx.util.functions.Func1;
4531
import rx.util.functions.Func2;
4632

33+
import java.util.ArrayList;
34+
import java.util.Arrays;
35+
import java.util.LinkedList;
36+
import java.util.List;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.concurrent.atomic.AtomicReference;
41+
42+
import static org.junit.Assert.*;
43+
import static org.mockito.Matchers.any;
44+
import static org.mockito.Matchers.anyInt;
45+
import static org.mockito.Matchers.anyString;
46+
import static org.mockito.Mockito.*;
47+
4748
public class ObservableTests {
4849

4950
@Mock
@@ -958,4 +959,11 @@ public void testRangeWithScheduler() {
958959
inOrder.verify(aObserver, times(1)).onCompleted();
959960
inOrder.verifyNoMoreInteractions();
960961
}
962+
963+
@Test
964+
public void testRepeatTake() {
965+
Observable<Integer> xs = Observable.from(1,2);
966+
Object[] ys = xs.repeat(Schedulers.newThread()).take(4).toList().toBlockingObservable().last().toArray();
967+
assertArrayEquals(new Object[]{ 1, 2, 1, 2}, ys);
968+
}
961969
}

0 commit comments

Comments
 (0)