Skip to content

Commit 2bf40bc

Browse files
Merge branch 'master' of github.com:Netflix/RxJava into idiomaticscala
2 parents 171131c + 128e598 commit 2bf40bc

File tree

4 files changed

+285
-33
lines changed

4 files changed

+285
-33
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import rx.operators.OperationOnErrorResumeNextViaObservable;
5353
import rx.operators.OperationOnErrorReturn;
5454
import rx.operators.OperationOnExceptionResumeNextViaObservable;
55+
import rx.operators.OperationRetry;
5556
import rx.operators.OperationSample;
5657
import rx.operators.OperationScan;
5758
import rx.operators.OperationSkip;
@@ -1891,29 +1892,6 @@ public static <T> Observable<T> from(Future<? extends T> future, long timeout, T
18911892
return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit));
18921893
}
18931894

1894-
/**
1895-
* <p>
1896-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
1897-
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
1898-
* new Observable will be the result of the function applied to the first item emitted by {@code w0} and the first item emitted by {@code w1}; the second item emitted by
1899-
* the new Observable will be the result of the function applied to the second item emitted by {@code w0} and the second item emitted by {@code w1}; and so forth.
1900-
* <p>
1901-
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
1902-
* of the source Observable that emits the fewest items.
1903-
*
1904-
* @param o1
1905-
* one source Observable
1906-
* @param o2
1907-
* another source Observable
1908-
* @param zipFunction
1909-
* a function that, when applied to an item emitted by each of the source
1910-
* Observables, results in an item that will be emitted by the resulting Observable
1911-
* @return an Observable that emits the zipped results
1912-
*/
1913-
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
1914-
return create(OperationZip.zip(o1, o2, zipFunction));
1915-
}
1916-
19171895
/**
19181896
* Returns an Observable that emits Boolean values that indicate whether the pairs of items
19191897
* emitted by two source Observables are equal.
@@ -1960,6 +1938,31 @@ public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> firs
19601938
return zip(first, second, equality);
19611939
}
19621940

1941+
/**
1942+
* Returns an Observable that emits the results of a function of your choosing applied to
1943+
* combinations of two items emitted, in sequence, by two other Observables.
1944+
* <p>
1945+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
1946+
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
1947+
* new Observable will be the result of the function applied to the first item emitted by {@code w0} and the first item emitted by {@code w1}; the second item emitted by
1948+
* the new Observable will be the result of the function applied to the second item emitted by {@code w0} and the second item emitted by {@code w1}; and so forth.
1949+
* <p>
1950+
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
1951+
* of the source Observable that emits the fewest items.
1952+
*
1953+
* @param o1
1954+
* one source Observable
1955+
* @param o2
1956+
* another source Observable
1957+
* @param zipFunction
1958+
* a function that, when applied to an item emitted by each of the source
1959+
* Observables, results in an item that will be emitted by the resulting Observable
1960+
* @return an Observable that emits the zipped results
1961+
*/
1962+
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
1963+
return create(OperationZip.zip(o1, o2, zipFunction));
1964+
}
1965+
19631966
/**
19641967
* Returns an Observable that emits the results of a function of your choosing applied to
19651968
* combinations of three items emitted, in sequence, by three other Observables.
@@ -2021,7 +2024,7 @@ public static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1,
20212024

20222025
/**
20232026
* Returns an Observable that emits the results of a function of your choosing applied to
2024-
* combinations of four items emitted, in sequence, by four other Observables.
2027+
* combinations of five items emitted, in sequence, by five other Observables.
20252028
* <p>
20262029
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
20272030
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
@@ -2054,7 +2057,7 @@ public static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1>
20542057

20552058
/**
20562059
* Returns an Observable that emits the results of a function of your choosing applied to
2057-
* combinations of four items emitted, in sequence, by four other Observables.
2060+
* combinations of six items emitted, in sequence, by six other Observables.
20582061
* <p>
20592062
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
20602063
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
@@ -2090,7 +2093,7 @@ public static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends
20902093

20912094
/**
20922095
* Returns an Observable that emits the results of a function of your choosing applied to
2093-
* combinations of four items emitted, in sequence, by four other Observables.
2096+
* combinations of seven items emitted, in sequence, by seven other Observables.
20942097
* <p>
20952098
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
20962099
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
@@ -2128,7 +2131,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? ext
21282131

21292132
/**
21302133
* Returns an Observable that emits the results of a function of your choosing applied to
2131-
* combinations of four items emitted, in sequence, by four other Observables.
2134+
* combinations of eight items emitted, in sequence, by eight other Observables.
21322135
* <p>
21332136
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
21342137
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
@@ -2168,7 +2171,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<?
21682171

21692172
/**
21702173
* Returns an Observable that emits the results of a function of your choosing applied to
2171-
* combinations of four items emitted, in sequence, by four other Observables.
2174+
* combinations of nine items emitted, in sequence, by nine other Observables.
21722175
* <p>
21732176
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
21742177
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
@@ -2680,7 +2683,8 @@ public Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit
26802683

26812684
/**
26822685
* Returns an Observable that emits the results of a function of your choosing applied to
2683-
* combinations of four items emitted, in sequence, by four other Observables.
2686+
* combinations of N items emitted, in sequence, by N other Observables as provided by an Iterable.
2687+
*
26842688
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
26852689
* new Observable will be the result of the function applied to the first item emitted by
26862690
* all of the Observalbes; the second item emitted by the new Observable will be the result of
@@ -2727,7 +2731,7 @@ public Observable<R> call(List<? extends Observable<?>> wsList) {
27272731
* Observables, results in an item that will be emitted by the resulting Observable
27282732
* @return an Observable that emits the zipped results
27292733
*/
2730-
public static <R> Observable<R> zip(Collection<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
2734+
public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
27312735
return create(OperationZip.zip(ws, zipFunction));
27322736
}
27332737

@@ -3125,6 +3129,42 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
31253129
public ConnectableObservable<T> replay() {
31263130
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
31273131
}
3132+
3133+
/**
3134+
* Retry subscription to origin Observable upto given retry count.
3135+
* <p>
3136+
* If {@link Observer#onError} is invoked the source Observable will be re-subscribed to as many times as defined by retryCount.
3137+
* <p>
3138+
* Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together.
3139+
* <p>
3140+
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
3141+
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
3142+
*
3143+
* @param retryCount
3144+
* Number of retry attempts before failing.
3145+
* @return Observable with retry logic.
3146+
*/
3147+
public Observable<T> retry(int retryCount) {
3148+
return create(OperationRetry.retry(this, retryCount));
3149+
}
3150+
3151+
/**
3152+
* Retry subscription to origin Observable whenever onError is called (infinite retry count).
3153+
* <p>
3154+
* If {@link Observer#onError} is invoked the source Observable will be re-subscribed to.
3155+
* <p>
3156+
* Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together.
3157+
* <p>
3158+
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
3159+
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
3160+
*
3161+
* @param retryCount
3162+
* Number of retry attempts before failing.
3163+
* @return Observable with retry logic.
3164+
*/
3165+
public Observable<T> retry() {
3166+
return create(OperationRetry.retry(this));
3167+
}
31283168

31293169
/**
31303170
* This method has similar behavior to {@link #replay} except that this auto-subscribes to
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package rx.operators;
2+
3+
/**
4+
* Copyright 2013 Netflix, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
import org.junit.Test;
24+
import org.mockito.InOrder;
25+
26+
import rx.Observable;
27+
import rx.Observable.OnSubscribeFunc;
28+
import rx.Observer;
29+
import rx.Subscription;
30+
import rx.concurrency.Schedulers;
31+
import rx.subscriptions.CompositeSubscription;
32+
import rx.subscriptions.Subscriptions;
33+
import rx.util.functions.Action0;
34+
35+
public class OperationRetry {
36+
37+
private static final int INFINITE_RETRY = -1;
38+
39+
public static <T> OnSubscribeFunc<T> retry(final Observable<T> observable, final int retryCount) {
40+
return new Retry<T>(observable, retryCount);
41+
}
42+
43+
public static <T> OnSubscribeFunc<T> retry(final Observable<T> observable) {
44+
return new Retry<T>(observable, INFINITE_RETRY);
45+
}
46+
47+
private static class Retry<T> implements OnSubscribeFunc<T> {
48+
49+
private final Observable<T> source;
50+
private final int retryCount;
51+
private final AtomicInteger attempts = new AtomicInteger(0);
52+
private final CompositeSubscription subscription = new CompositeSubscription();
53+
54+
public Retry(Observable<T> source, int retryCount) {
55+
this.source = source;
56+
this.retryCount = retryCount;
57+
}
58+
59+
@Override
60+
public Subscription onSubscribe(Observer<? super T> observer) {
61+
subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer)));
62+
return subscription;
63+
}
64+
65+
private Action0 attemptSubscription(final Observer<? super T> observer) {
66+
return new Action0() {
67+
68+
@Override
69+
public void call() {
70+
attempts.incrementAndGet();
71+
source.subscribe(new Observer<T>() {
72+
73+
@Override
74+
public void onCompleted() {
75+
observer.onCompleted();
76+
}
77+
78+
@Override
79+
public void onError(Throwable e) {
80+
if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !subscription.isUnsubscribed()) {
81+
// retry again
82+
// remove the last subscription since we have completed (so as we retry we don't build up a huge list)
83+
subscription.removeLast();
84+
// add the new subscription and schedule a retry
85+
subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer)));
86+
} else {
87+
// give up and pass the failure
88+
observer.onError(e);
89+
}
90+
}
91+
92+
@Override
93+
public void onNext(T v) {
94+
observer.onNext(v);
95+
}
96+
});
97+
98+
}
99+
};
100+
}
101+
102+
}
103+
104+
public static class UnitTest {
105+
106+
@Test
107+
public void testOriginFails() {
108+
@SuppressWarnings("unchecked")
109+
Observer<String> observer = mock(Observer.class);
110+
Observable<String> origin = Observable.create(new FuncWithErrors(2));
111+
origin.subscribe(observer);
112+
113+
InOrder inOrder = inOrder(observer);
114+
inOrder.verify(observer, times(1)).onNext("beginningEveryTime");
115+
inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));
116+
inOrder.verify(observer, never()).onNext("onSuccessOnly");
117+
inOrder.verify(observer, never()).onCompleted();
118+
}
119+
120+
@Test
121+
public void testRetryFail() {
122+
int NUM_RETRIES = 1;
123+
int NUM_FAILURES = 2;
124+
@SuppressWarnings("unchecked")
125+
Observer<String> observer = mock(Observer.class);
126+
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
127+
Observable.create(retry(origin, NUM_RETRIES)).subscribe(observer);
128+
129+
InOrder inOrder = inOrder(observer);
130+
// should show 2 attempts (first time fail, second time (1st retry) fail)
131+
inOrder.verify(observer, times(1 + NUM_RETRIES)).onNext("beginningEveryTime");
132+
// should only retry once, fail again and emit onError
133+
inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));
134+
// no success
135+
inOrder.verify(observer, never()).onNext("onSuccessOnly");
136+
inOrder.verify(observer, never()).onCompleted();
137+
inOrder.verifyNoMoreInteractions();
138+
}
139+
140+
@Test
141+
public void testRetrySuccess() {
142+
int NUM_RETRIES = 3;
143+
int NUM_FAILURES = 2;
144+
@SuppressWarnings("unchecked")
145+
Observer<String> observer = mock(Observer.class);
146+
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
147+
Observable.create(retry(origin, NUM_RETRIES)).subscribe(observer);
148+
149+
InOrder inOrder = inOrder(observer);
150+
// should show 3 attempts
151+
inOrder.verify(observer, times(1 + NUM_FAILURES)).onNext("beginningEveryTime");
152+
// should have no errors
153+
inOrder.verify(observer, never()).onError(any(Throwable.class));
154+
// should have a single success
155+
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
156+
// should have a single successful onCompleted
157+
inOrder.verify(observer, times(1)).onCompleted();
158+
inOrder.verifyNoMoreInteractions();
159+
}
160+
161+
@Test
162+
public void testInfiniteRetry() {
163+
int NUM_FAILURES = 20;
164+
@SuppressWarnings("unchecked")
165+
Observer<String> observer = mock(Observer.class);
166+
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_FAILURES));
167+
Observable.create(retry(origin)).subscribe(observer);
168+
169+
InOrder inOrder = inOrder(observer);
170+
// should show 3 attempts
171+
inOrder.verify(observer, times(1 + NUM_FAILURES)).onNext("beginningEveryTime");
172+
// should have no errors
173+
inOrder.verify(observer, never()).onError(any(Throwable.class));
174+
// should have a single success
175+
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
176+
// should have a single successful onCompleted
177+
inOrder.verify(observer, times(1)).onCompleted();
178+
inOrder.verifyNoMoreInteractions();
179+
}
180+
181+
public static class FuncWithErrors implements OnSubscribeFunc<String> {
182+
183+
private final int numFailures;
184+
private final AtomicInteger count = new AtomicInteger(0);
185+
186+
FuncWithErrors(int count) {
187+
this.numFailures = count;
188+
}
189+
190+
@Override
191+
public Subscription onSubscribe(Observer<? super String> o) {
192+
o.onNext("beginningEveryTime");
193+
if (count.incrementAndGet() <= numFailures) {
194+
o.onError(new RuntimeException("forced failure: " + count.get()));
195+
} else {
196+
o.onNext("onSuccessOnly");
197+
o.onCompleted();
198+
}
199+
return Subscriptions.empty();
200+
}
201+
};
202+
}
203+
}

rxjava-core/src/main/java/rx/operators/OperationZip.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> OnSubscribeFunc<R> zip(Obs
147147
return a;
148148
}
149149

150-
public static <R> OnSubscribeFunc<R> zip(Collection<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
150+
public static <R> OnSubscribeFunc<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
151151
Aggregator<R> a = new Aggregator<R>(zipFunction);
152152
for (Observable<?> w : ws) {
153153
ZipObserver<R, Object> zipObserver = new ZipObserver<R, Object>(a, w);

0 commit comments

Comments
 (0)