Skip to content

Commit 9a6f9cb

Browse files
Added OperationRepeat & repeat operator
1 parent 19b954f commit 9a6f9cb

File tree

1 file changed

+36
-39
lines changed

1 file changed

+36
-39
lines changed

rxjava-core/src/main/java/rx/operators/OperationRepeat.java

Lines changed: 36 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,50 @@
1818

1919
import rx.Observable;
2020
import rx.Observer;
21-
import rx.Scheduler;
2221
import rx.Subscription;
2322
import rx.concurrency.Schedulers;
24-
import rx.subscriptions.MultipleAssignmentSubscription;
25-
import rx.util.functions.Func2;
23+
import rx.subscriptions.SerialSubscription;
24+
import rx.util.functions.Action0;
25+
import rx.util.functions.Action1;
2626

27-
public final class OperationRepeat {
27+
public class OperationRepeat<T> implements Observable.OnSubscribeFunc<T> {
2828

29-
public static <T> Observable.OnSubscribeFunc<T> repeat(Observable<? extends T> source) {
30-
return new RepeatObservable<T>(source);
31-
}
32-
33-
static class RepeatObservable<T> implements Observable.OnSubscribeFunc<T> {
29+
private final Observable<T> source;
3430

35-
RepeatObservable(Observable<? extends T> source) {
36-
this.source = source;
37-
}
31+
public static <T> Observable.OnSubscribeFunc<T> repeat(Observable<T> seed) {
32+
return new OperationRepeat(seed);
33+
}
3834

39-
private Observable<? extends T> source;
40-
private Observer<? super T> observer;
41-
private MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
35+
private OperationRepeat(Observable<T> source) {
36+
this.source = source;
37+
}
4238

43-
@Override
44-
public Subscription onSubscribe(Observer observer) {
45-
this.observer = observer;
46-
Loop();
47-
return subscription;
48-
}
39+
@Override
40+
public Subscription onSubscribe(final Observer<? super T> observer) {
41+
final SerialSubscription subscription = new SerialSubscription();
42+
subscription.setSubscription(Schedulers.currentThread().schedule(new Action1<Action0>() {
43+
@Override
44+
public void call(final Action0 self) {
45+
subscription.setSubscription(source.subscribe(new Observer<T>() {
4946

50-
void Loop() {
51-
subscription.setSubscription(Schedulers.currentThread().schedule(0, new Func2<Scheduler, Integer, Subscription>() {
52-
@Override
53-
public Subscription call(Scheduler s, Integer n) {
54-
return source.subscribe(new Observer<T>() {
55-
@Override
56-
public void onCompleted() { Loop(); }
47+
@Override
48+
public void onCompleted() {
49+
subscription.getSubscription().unsubscribe();
50+
self.call();
51+
}
5752

58-
@Override
59-
public void onError(Throwable error) { observer.onError(error); }
53+
@Override
54+
public void onError(Throwable error) {
55+
observer.onError(error);
56+
}
6057

61-
@Override
62-
public void onNext(T value) { observer.onNext(value); }
63-
});
64-
}
65-
}));
66-
}
58+
@Override
59+
public void onNext(T value) {
60+
observer.onNext(value);
61+
}
62+
}));
63+
}
64+
}));
65+
return subscription;
6766
}
68-
69-
70-
}
67+
}

0 commit comments

Comments
 (0)