Skip to content

Commit 85debff

Browse files
OperatorRepeat
1 parent 65c4a85 commit 85debff

File tree

4 files changed

+101
-85
lines changed

4 files changed

+101
-85
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import rx.operators.OperationOnErrorReturn;
7070
import rx.operators.OperationOnExceptionResumeNextViaObservable;
7171
import rx.operators.OperationParallelMerge;
72-
import rx.operators.OperationRepeat;
72+
import rx.operators.OperatorRepeat;
7373
import rx.operators.OperationReplay;
7474
import rx.operators.OperationRetry;
7575
import rx.operators.OperationSample;
@@ -1616,7 +1616,7 @@ public final static Observable<Long> interval(long interval, TimeUnit unit, Sche
16161616
public final static <T> Observable<T> just(T value) {
16171617
return from(Arrays.asList(value));
16181618
}
1619-
1619+
16201620
/**
16211621
* Returns an Observable that emits a single item and then completes, on a specified scheduler.
16221622
* <p>
@@ -2355,6 +2355,15 @@ public final static <T extends Comparable<? super T>> Observable<T> min(Observab
23552355
return OperationMinMax.min(source);
23562356
}
23572357

2358+
/**
2359+
* Convert the current Observable<T> into an Observable<Observable<T>>.
2360+
*
2361+
* @return
2362+
*/
2363+
private final Observable<Observable<T>> nest() {
2364+
return from(this);
2365+
}
2366+
23582367
/**
23592368
* Returns an Observable that never sends any items or notifications to an {@link Observer}.
23602369
* <p>
@@ -5518,7 +5527,7 @@ public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> acc
55185527
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55195528
*/
55205529
public final Observable<T> repeat() {
5521-
return this.repeat(Schedulers.currentThread());
5530+
return nest().lift(new OperatorRepeat<T>());
55225531
}
55235532

55245533
/**
@@ -5535,7 +5544,7 @@ public final Observable<T> repeat() {
55355544
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
55365545
*/
55375546
public final Observable<T> repeat(Scheduler scheduler) {
5538-
return create(OperationRepeat.repeat(this, scheduler));
5547+
return nest().lift(new OperatorRepeat<T>(scheduler));
55395548
}
55405549

55415550
/**

rxjava-core/src/main/java/rx/operators/OperationRepeat.java

Lines changed: 0 additions & 80 deletions
This file was deleted.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import rx.Observable;
20+
import rx.Scheduler;
21+
import rx.Scheduler.Inner;
22+
import rx.Subscriber;
23+
import rx.schedulers.Schedulers;
24+
import rx.util.functions.Action1;
25+
26+
public class OperatorRepeat<T> implements Operator<T, Observable<T>> {
27+
28+
private final Scheduler scheduler;
29+
30+
public OperatorRepeat(Scheduler scheduler) {
31+
this.scheduler = scheduler;
32+
33+
}
34+
35+
public OperatorRepeat() {
36+
this(Schedulers.trampoline());
37+
}
38+
39+
@Override
40+
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
41+
return new Subscriber<Observable<T>>(child) {
42+
43+
@Override
44+
public void onCompleted() {
45+
// ignore as we will keep repeating
46+
}
47+
48+
@Override
49+
public void onError(Throwable e) {
50+
child.onError(e);
51+
}
52+
53+
@Override
54+
public void onNext(final Observable<T> t) {
55+
scheduler.schedule(new Action1<Inner>() {
56+
57+
final Action1<Inner> self = this;
58+
59+
@Override
60+
public void call(final Inner inner) {
61+
62+
t.subscribe(new Subscriber<T>(child) {
63+
64+
@Override
65+
public void onCompleted() {
66+
inner.schedule(self);
67+
}
68+
69+
@Override
70+
public void onError(Throwable e) {
71+
child.onError(e);
72+
}
73+
74+
@Override
75+
public void onNext(T t) {
76+
child.onNext(t);
77+
}
78+
79+
});
80+
}
81+
82+
});
83+
}
84+
85+
};
86+
}
87+
}

rxjava-core/src/test/java/rx/operators/OperationRepeatTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorRepeatTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import rx.subscriptions.Subscriptions;
3232
import rx.util.functions.Func1;
3333

34-
public class OperationRepeatTest {
34+
public class OperatorRepeatTest {
3535

3636
@Test
3737
public void testRepetition() {

0 commit comments

Comments
 (0)