Skip to content

Commit bdd91eb

Browse files
Merge pull request #189 from benjchristensen/polishing
Handful of Improvements
2 parents b29508a + 53ce8d3 commit bdd91eb

File tree

2 files changed

+75
-18
lines changed

2 files changed

+75
-18
lines changed

language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/VideoExample.groovy

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,27 @@ Observable getVideoGridForDisplay(userId) {
108108
Observable<VideoList> getListOfLists(userId) {
109109
return Observable.create({ observer ->
110110
BooleanSubscription subscription = new BooleanSubscription();
111-
// this will happen on a separate thread as it requires a network call
112-
executor.execute({
113-
// simulate network latency
114-
Thread.sleep(180);
115-
for(i in 0..15) {
116-
if(subscription.isUnsubscribed()) {
117-
break;
111+
try {
112+
// this will happen on a separate thread as it requires a network call
113+
executor.execute({
114+
// simulate network latency
115+
Thread.sleep(180);
116+
for(i in 0..15) {
117+
if(subscription.isUnsubscribed()) {
118+
break;
119+
}
120+
try {
121+
//println("****** emitting list: " + i)
122+
observer.onNext(new VideoList(i))
123+
}catch(Exception e) {
124+
observer.onError(e);
125+
}
118126
}
119-
//println("****** emitting list: " + i)
120-
observer.onNext(new VideoList(i))
121-
}
122-
observer.onCompleted();
123-
})
127+
observer.onCompleted();
128+
})
129+
}catch(Exception e) {
130+
observer.onError(e);
131+
}
124132
return subscription;
125133
})
126134
}

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.concurrent.CountDownLatch;
2828
import java.util.concurrent.Future;
2929
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.TimeoutException;
3130
import java.util.concurrent.atomic.AtomicReference;
3231

3332
import org.junit.Before;
@@ -156,11 +155,30 @@ public Subscription subscribe(Observer<T> observer) {
156155
throw new IllegalStateException("onSubscribe function can not be null.");
157156
// the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
158157
}
159-
if (isTrusted) {
160-
return onSubscribe.call(observer);
161-
} else {
162-
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
163-
return subscription.wrap(onSubscribe.call(new AtomicObserver<T>(subscription, observer)));
158+
try {
159+
if (isTrusted) {
160+
Subscription s = onSubscribe.call(observer);
161+
if (s == null) {
162+
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
163+
// we want to gracefully handle it the same as AtomicObservableSubscription does
164+
return Subscriptions.empty();
165+
} else {
166+
return s;
167+
}
168+
} else {
169+
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
170+
return subscription.wrap(onSubscribe.call(new AtomicObserver<T>(subscription, observer)));
171+
}
172+
} catch (Exception e) {
173+
// if an unhandled error occurs executing the onSubscribe we will propagate it
174+
try {
175+
observer.onError(e);
176+
} catch (Exception e2) {
177+
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
178+
// so we are unable to propagate the error correctly and will just throw
179+
throw new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
180+
}
181+
return Subscriptions.empty();
164182
}
165183
}
166184

@@ -415,6 +433,18 @@ public void onNext(T args) {
415433
}
416434
}
417435

436+
/**
437+
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
438+
* <p>
439+
* NOTE: This will block even if the Observable is asynchronous.
440+
* <p>
441+
* This is similar to {@link #subscribe(Observer)} but blocks. Because it blocks it does not need the {@link Observer#onCompleted()} or {@link Observer#onError(Exception)} methods.
442+
*
443+
* @param onNext
444+
* {@link Action1}
445+
* @throws RuntimeException
446+
* if error occurs
447+
*/
418448
@SuppressWarnings({ "rawtypes", "unchecked" })
419449
public void forEach(final Object o) {
420450
if (o instanceof Action1) {
@@ -3405,6 +3435,25 @@ public Boolean call(String s) {
34053435
}));
34063436
}
34073437

3438+
@Test
3439+
public void testOnSubscribeFails() {
3440+
@SuppressWarnings("unchecked")
3441+
Observer<String> observer = mock(Observer.class);
3442+
final RuntimeException re = new RuntimeException("bad impl");
3443+
Observable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {
3444+
3445+
@Override
3446+
public Subscription call(Observer<String> t1) {
3447+
throw re;
3448+
}
3449+
3450+
});
3451+
o.subscribe(observer);
3452+
verify(observer, times(0)).onNext(anyString());
3453+
verify(observer, times(0)).onCompleted();
3454+
verify(observer, times(1)).onError(re);
3455+
}
3456+
34083457
@Test
34093458
public void testLastEmptyObservable() {
34103459
Observable<String> obs = Observable.toObservable();

0 commit comments

Comments
 (0)