Skip to content

Commit 19b954f

Browse files
Added OperationRepeat & repeat operator
1 parent 78a7a6e commit 19b954f

File tree

2 files changed

+95
-93
lines changed

2 files changed

+95
-93
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 25 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -15,83 +15,11 @@
1515
*/
1616
package rx;
1717

18-
import static rx.util.functions.Functions.*;
19-
20-
import java.util.ArrayList;
21-
import java.util.Arrays;
22-
import java.util.Comparator;
23-
import java.util.List;
24-
import java.util.concurrent.ConcurrentHashMap;
25-
import java.util.concurrent.Future;
26-
import java.util.concurrent.TimeUnit;
27-
2818
import rx.concurrency.Schedulers;
2919
import rx.observables.BlockingObservable;
3020
import rx.observables.ConnectableObservable;
3121
import rx.observables.GroupedObservable;
32-
import rx.operators.OperationAll;
33-
import rx.operators.OperationAmb;
34-
import rx.operators.OperationAny;
35-
import rx.operators.OperationAverage;
36-
import rx.operators.OperationBuffer;
37-
import rx.operators.OperationCache;
38-
import rx.operators.OperationCast;
39-
import rx.operators.OperationCombineLatest;
40-
import rx.operators.OperationConcat;
41-
import rx.operators.OperationDebounce;
42-
import rx.operators.OperationDefaultIfEmpty;
43-
import rx.operators.OperationDefer;
44-
import rx.operators.OperationDematerialize;
45-
import rx.operators.OperationDistinct;
46-
import rx.operators.OperationDistinctUntilChanged;
47-
import rx.operators.OperationDoOnEach;
48-
import rx.operators.OperationElementAt;
49-
import rx.operators.OperationFilter;
50-
import rx.operators.OperationFinally;
51-
import rx.operators.OperationFirstOrDefault;
52-
import rx.operators.OperationGroupBy;
53-
import rx.operators.OperationInterval;
54-
import rx.operators.OperationLast;
55-
import rx.operators.OperationMap;
56-
import rx.operators.OperationMaterialize;
57-
import rx.operators.OperationMerge;
58-
import rx.operators.OperationMergeDelayError;
59-
import rx.operators.OperationMinMax;
60-
import rx.operators.OperationMulticast;
61-
import rx.operators.OperationObserveOn;
62-
import rx.operators.OperationOnErrorResumeNextViaFunction;
63-
import rx.operators.OperationOnErrorResumeNextViaObservable;
64-
import rx.operators.OperationOnErrorReturn;
65-
import rx.operators.OperationOnExceptionResumeNextViaObservable;
66-
import rx.operators.OperationParallel;
67-
import rx.operators.OperationParallelMerge;
68-
import rx.operators.OperationRetry;
69-
import rx.operators.OperationSample;
70-
import rx.operators.OperationScan;
71-
import rx.operators.OperationSkip;
72-
import rx.operators.OperationSkipLast;
73-
import rx.operators.OperationSkipWhile;
74-
import rx.operators.OperationSubscribeOn;
75-
import rx.operators.OperationSum;
76-
import rx.operators.OperationSwitch;
77-
import rx.operators.OperationSynchronize;
78-
import rx.operators.OperationTake;
79-
import rx.operators.OperationTakeLast;
80-
import rx.operators.OperationTakeUntil;
81-
import rx.operators.OperationTakeWhile;
82-
import rx.operators.OperationThrottleFirst;
83-
import rx.operators.OperationTimeInterval;
84-
import rx.operators.OperationTimeout;
85-
import rx.operators.OperationTimestamp;
86-
import rx.operators.OperationToObservableFuture;
87-
import rx.operators.OperationToObservableIterable;
88-
import rx.operators.OperationToObservableList;
89-
import rx.operators.OperationToObservableSortedList;
90-
import rx.operators.OperationUsing;
91-
import rx.operators.OperationWindow;
92-
import rx.operators.OperationZip;
93-
import rx.operators.SafeObservableSubscription;
94-
import rx.operators.SafeObserver;
22+
import rx.operators.*;
9523
import rx.plugins.RxJavaErrorHandler;
9624
import rx.plugins.RxJavaObservableExecutionHook;
9725
import rx.plugins.RxJavaPlugins;
@@ -100,26 +28,19 @@
10028
import rx.subjects.ReplaySubject;
10129
import rx.subjects.Subject;
10230
import rx.subscriptions.Subscriptions;
103-
import rx.util.Closing;
104-
import rx.util.OnErrorNotImplementedException;
105-
import rx.util.Opening;
106-
import rx.util.Range;
107-
import rx.util.TimeInterval;
108-
import rx.util.Timestamped;
109-
import rx.util.functions.Action0;
110-
import rx.util.functions.Action1;
111-
import rx.util.functions.Func0;
112-
import rx.util.functions.Func1;
113-
import rx.util.functions.Func2;
114-
import rx.util.functions.Func3;
115-
import rx.util.functions.Func4;
116-
import rx.util.functions.Func5;
117-
import rx.util.functions.Func6;
118-
import rx.util.functions.Func7;
119-
import rx.util.functions.Func8;
120-
import rx.util.functions.Func9;
121-
import rx.util.functions.FuncN;
122-
import rx.util.functions.Function;
31+
import rx.util.*;
32+
import rx.util.functions.*;
33+
34+
import java.util.ArrayList;
35+
import java.util.Arrays;
36+
import java.util.Comparator;
37+
import java.util.List;
38+
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.Future;
40+
import java.util.concurrent.TimeUnit;
41+
42+
import static rx.util.functions.Functions.alwaysFalse;
43+
import static rx.util.functions.Functions.not;
12344

12445
/**
12546
* The Observable interface that implements the Reactive Pattern.
@@ -1040,6 +961,17 @@ public static Observable<Integer> range(int start, int count, Scheduler schedule
1040961
return range(start, count).observeOn(scheduler);
1041962
}
1042963

964+
/**
965+
* Repeats the observable sequence indefinitely.
966+
* <p>
967+
*
968+
* @return The observable sequence producing the elements of the given sequence repeatedly and sequentially.
969+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428(v=vs.103).aspx">MSDN: Observable.Repeat</a>
970+
*/
971+
public Observable<T> repeat() {
972+
return create(rx.operators.OperationRepeat.repeat(this));
973+
}
974+
1043975
/**
1044976
* Returns an Observable that calls an Observable factory to create its
1045977
* Observable for each new Observer that subscribes. That is, for each
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import rx.Observable;
20+
import rx.Observer;
21+
import rx.Scheduler;
22+
import rx.Subscription;
23+
import rx.concurrency.Schedulers;
24+
import rx.subscriptions.MultipleAssignmentSubscription;
25+
import rx.util.functions.Func2;
26+
27+
public final class OperationRepeat {
28+
29+
public static <T> Observable.OnSubscribeFunc<T> repeat(Observable<? extends T> source) {
30+
return new RepeatObservable<T>(source);
31+
}
32+
33+
static class RepeatObservable<T> implements Observable.OnSubscribeFunc<T> {
34+
35+
RepeatObservable(Observable<? extends T> source) {
36+
this.source = source;
37+
}
38+
39+
private Observable<? extends T> source;
40+
private Observer<? super T> observer;
41+
private MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
42+
43+
@Override
44+
public Subscription onSubscribe(Observer observer) {
45+
this.observer = observer;
46+
Loop();
47+
return subscription;
48+
}
49+
50+
void Loop() {
51+
subscription.setSubscription(Schedulers.currentThread().schedule(0, new Func2<Scheduler, Integer, Subscription>() {
52+
@Override
53+
public Subscription call(Scheduler s, Integer n) {
54+
return source.subscribe(new Observer<T>() {
55+
@Override
56+
public void onCompleted() { Loop(); }
57+
58+
@Override
59+
public void onError(Throwable error) { observer.onError(error); }
60+
61+
@Override
62+
public void onNext(T value) { observer.onNext(value); }
63+
});
64+
}
65+
}));
66+
}
67+
}
68+
69+
70+
}

0 commit comments

Comments
 (0)