1515 */
1616package rx .operators ;
1717
18- import static org .junit .Assert .*;
19- import static org .mockito .Matchers .*;
20- import static org .mockito .Mockito .*;
18+ import static org .junit .Assert .assertEquals ;
19+ import static org .mockito .Matchers .any ;
20+ import static org .mockito .Mockito .inOrder ;
21+ import static org .mockito .Mockito .never ;
22+ import static org .mockito .Mockito .times ;
23+ import static org .mockito .Mockito .verify ;
2124
2225import java .util .HashMap ;
2326import java .util .Map ;
27+ import java .util .concurrent .CountDownLatch ;
2428import java .util .concurrent .atomic .AtomicInteger ;
2529
2630import org .junit .Before ;
3337import rx .Observable .OnSubscribeFunc ;
3438import rx .Observer ;
3539import rx .Subscription ;
40+ import rx .concurrency .Schedulers ;
3641import rx .util .functions .Func1 ;
3742import rx .util .functions .Func2 ;
3843
@@ -59,17 +64,12 @@ public final class OperationMap {
5964 * @return a sequence that is the result of applying the transformation function to each item in the input sequence.
6065 */
6166 public static <T , R > OnSubscribeFunc <R > map (final Observable <? extends T > sequence , final Func1 <? super T , ? extends R > func ) {
62- return new OnSubscribeFunc <R >() {
63- @ Override
64- public Subscription onSubscribe (Observer <? super R > observer ) {
65- return new MapObservable <T , R >(sequence , new Func2 <T , Integer , R >() {
67+ return mapWithIndex (sequence , new Func2 <T , Integer , R >() {
6668 @ Override
6769 public R call (T value , @ SuppressWarnings ("unused" ) Integer unused ) {
6870 return func .call (value );
6971 }
70- }).onSubscribe (observer );
71- }
72- };
72+ });
7373 }
7474
7575 /**
@@ -136,7 +136,8 @@ public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer,
136136
137137 @ Override
138138 public Subscription onSubscribe (final Observer <? super R > observer ) {
139- return sequence .subscribe (new Observer <T >() {
139+ final SafeObservableSubscription subscription = new SafeObservableSubscription ();
140+ return subscription .wrap (sequence .subscribe (new SafeObserver <T >(subscription , new Observer <T >() {
140141 @ Override
141142 public void onNext (T value ) {
142143 observer .onNext (func .call (value , index ));
@@ -152,7 +153,7 @@ public void onError(Throwable ex) {
152153 public void onCompleted () {
153154 observer .onCompleted ();
154155 }
155- });
156+ }))) ;
156157 }
157158 }
158159
@@ -366,6 +367,41 @@ public String call(String s) {
366367 assertEquals (1 , c2 .get ());
367368 }
368369
370+ @ Test (expected = IllegalArgumentException .class )
371+ public void testMapWithIssue417 () {
372+ Observable .from (1 ).observeOn (Schedulers .threadPoolForComputation ())
373+ .map (new Func1 <Integer , Integer >() {
374+ public Integer call (Integer arg0 ) {
375+ throw new IllegalArgumentException ("any error" );
376+ }
377+ }).toBlockingObservable ().single ();
378+ }
379+
380+ @ Test
381+ public void testMapWithErrorInFuncAndThreadPoolScheduler () throws InterruptedException {
382+ // The error will throw in one of threads in the thread pool.
383+ // If map does not handle it, the error will disappear.
384+ // so map needs to handle the error by itself.
385+ final CountDownLatch latch = new CountDownLatch (1 );
386+ Observable <String > m = Observable .from ("one" )
387+ .observeOn (Schedulers .threadPoolForComputation ())
388+ .map (new Func1 <String , String >() {
389+ public String call (String arg0 ) {
390+ try {
391+ throw new IllegalArgumentException ("any error" );
392+ } finally {
393+ latch .countDown ();
394+ }
395+ }
396+ });
397+
398+ m .subscribe (stringObserver );
399+ latch .await ();
400+ InOrder inorder = inOrder (stringObserver );
401+ inorder .verify (stringObserver , times (1 )).onError (any (IllegalArgumentException .class ));
402+ inorder .verifyNoMoreInteractions ();
403+ }
404+
369405 private static Map <String , String > getMap (String prefix ) {
370406 Map <String , String > m = new HashMap <String , String >();
371407 m .put ("firstName" , prefix + "First" );
0 commit comments