File tree Expand file tree Collapse file tree 2 files changed +34
-2
lines changed
main/java/io/reactivex/internal/operators/flowable
test/java/io/reactivex/internal/operators/flowable Expand file tree Collapse file tree 2 files changed +34
-2
lines changed Original file line number Diff line number Diff line change 2121import io .reactivex .exceptions .Exceptions ;
2222import io .reactivex .internal .functions .ObjectHelper ;
2323import io .reactivex .internal .subscriptions .DeferredScalarSubscription ;
24+ import io .reactivex .plugins .RxJavaPlugins ;
2425
2526public final class FlowableFromCallable <T > extends Flowable <T > implements Callable <T > {
2627 final Callable <? extends T > callable ;
@@ -38,7 +39,11 @@ public void subscribeActual(Subscriber<? super T> s) {
3839 t = ObjectHelper .requireNonNull (callable .call (), "The callable returned a null value" );
3940 } catch (Throwable ex ) {
4041 Exceptions .throwIfFatal (ex );
41- s .onError (ex );
42+ if (deferred .isCancelled ()) {
43+ RxJavaPlugins .onError (ex );
44+ } else {
45+ s .onError (ex );
46+ }
4247 return ;
4348 }
4449
Original file line number Diff line number Diff line change 1616
1717package io .reactivex .internal .operators .flowable ;
1818
19+ import static org .junit .Assert .assertEquals ;
20+ import static org .mockito .ArgumentMatchers .any ;
1921import static org .mockito .Mockito .*;
20- import static org .junit .Assert .*;
2122
23+ import java .util .List ;
2224import java .util .concurrent .*;
2325
2426import org .junit .Test ;
2729import org .reactivestreams .*;
2830
2931import io .reactivex .*;
32+ import io .reactivex .exceptions .TestException ;
3033import io .reactivex .functions .Function ;
34+ import io .reactivex .plugins .RxJavaPlugins ;
3135import io .reactivex .schedulers .Schedulers ;
3236import io .reactivex .subscribers .TestSubscriber ;
3337
@@ -238,4 +242,27 @@ public Object call() throws Exception {
238242 .test ()
239243 .assertFailure (NullPointerException .class );
240244 }
245+
246+ @ Test (timeout = 5000 )
247+ public void undeliverableUponCancellation () throws Exception {
248+ List <Throwable > errors = TestHelper .trackPluginErrors ();
249+ try {
250+ final TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
251+
252+ Flowable .fromCallable (new Callable <Integer >() {
253+ @ Override
254+ public Integer call () throws Exception {
255+ ts .cancel ();
256+ throw new TestException ();
257+ }
258+ })
259+ .subscribe (ts );
260+
261+ ts .assertEmpty ();
262+
263+ TestHelper .assertUndeliverable (errors , 0 , TestException .class );
264+ } finally {
265+ RxJavaPlugins .reset ();
266+ }
267+ }
241268}
You can’t perform that action at this time.
0 commit comments