Skip to content

Commit 4250d27

Browse files
author
jmhofer
committed
handled combine functions that throw exceptions and added a test against this case
1 parent 7ce53f0 commit 4250d27

File tree

1 file changed

+38
-6
lines changed

1 file changed

+38
-6
lines changed

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import org.junit.Test;
3232
import org.mockito.InOrder;
33+
import org.mockito.Matchers;
3334

3435
import rx.Observable;
3536
import rx.Observer;
@@ -47,9 +48,12 @@ public class OperationCombineLatest {
4748
/**
4849
* Combines the two given observables, emitting an event containing an aggregation of the latest values of each of the source observables
4950
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
50-
* @param w0 The first source observable.
51-
* @param w1 The second source observable.
52-
* @param combineLatestFunction The aggregation function used to combine the source observable values.
51+
* @param w0
52+
* The first source observable.
53+
* @param w1
54+
* The second source observable.
55+
* @param combineLatestFunction
56+
* The aggregation function used to combine the source observable values.
5357
* @return A function from an observer to a subscription. This can be used to create an observable from.
5458
*/
5559
public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineLatestFunction) {
@@ -236,7 +240,12 @@ <T> void next(CombineObserver<R, T> w, T arg) {
236240
// if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args
237241
// we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling
238242
// this 'next' method while another thread finishes calling this combineLatestFunction
239-
observer.onNext(combineLatestFunction.call(argsToCombineLatest));
243+
try {
244+
R combinedValue = combineLatestFunction.call(argsToCombineLatest);
245+
observer.onNext(combinedValue);
246+
} catch(Exception ex) {
247+
observer.onError(ex);
248+
}
240249
}
241250

242251
@Override
@@ -273,10 +282,33 @@ private void stop() {
273282

274283
public static class UnitTest {
275284

276-
@SuppressWarnings("unchecked")
277-
/* mock calls don't do generics */
285+
@Test
286+
public void testCombineLatestWithFunctionThatThrowsAnException() {
287+
@SuppressWarnings("unchecked") // mock calls don't do generics
288+
Observer<String> w = mock(Observer.class);
289+
290+
TestObservable w1 = new TestObservable();
291+
TestObservable w2 = new TestObservable();
292+
293+
Observable<String> combined = Observable.create(combineLatest(w1, w2, new Func2<String, String, String>() {
294+
@Override
295+
public String call(String v1, String v2) {
296+
throw new RuntimeException("I don't work.");
297+
}
298+
}));
299+
combined.subscribe(w);
300+
301+
w1.Observer.onNext("first value of w1");
302+
w2.Observer.onNext("first value of w2");
303+
304+
verify(w, never()).onNext(anyString());
305+
verify(w, never()).onCompleted();
306+
verify(w, times(1)).onError(Matchers.<RuntimeException>any());
307+
}
308+
278309
@Test
279310
public void testCombineLatestDifferentLengthObservableSequences1() {
311+
@SuppressWarnings("unchecked") // mock calls don't do generics
280312
Observer<String> w = mock(Observer.class);
281313

282314
TestObservable w1 = new TestObservable();

0 commit comments

Comments
 (0)