File tree Expand file tree Collapse file tree 2 files changed +25
-4
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +25
-4
lines changed Original file line number Diff line number Diff line change 1717
1818import rx .Observable .Operator ;
1919import rx .Subscriber ;
20+ import rx .exceptions .Exceptions ;
21+ import rx .exceptions .OnErrorThrowable ;
2022import rx .functions .Func1 ;
2123import rx .functions .Func2 ;
2224
@@ -52,18 +54,19 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
5254 private boolean done = false ;
5355
5456 @ Override
55- public void onNext (T args ) {
57+ public void onNext (T t ) {
5658 boolean isSelected ;
5759 try {
58- isSelected = predicate .call (args , counter ++);
60+ isSelected = predicate .call (t , counter ++);
5961 } catch (Throwable e ) {
6062 done = true ;
61- subscriber .onError (e );
63+ Exceptions .throwIfFatal (e );
64+ subscriber .onError (OnErrorThrowable .addValueAsLastCause (e , t ));
6265 unsubscribe ();
6366 return ;
6467 }
6568 if (isSelected ) {
66- subscriber .onNext (args );
69+ subscriber .onNext (t );
6770 } else {
6871 done = true ;
6972 subscriber .onCompleted ();
Original file line number Diff line number Diff line change 1515 */
1616package rx .internal .operators ;
1717
18+ import static org .junit .Assert .assertTrue ;
1819import static org .junit .Assert .fail ;
1920import static org .mockito .Matchers .any ;
2021import static org .mockito .Mockito .*;
2526
2627import rx .*;
2728import rx .Observable .OnSubscribe ;
29+ import rx .exceptions .TestException ;
2830import rx .functions .Func1 ;
2931import rx .observers .TestSubscriber ;
3032import rx .subjects .*;
@@ -261,4 +263,20 @@ public Boolean call(Integer t1) {
261263
262264 Assert .assertFalse ("Unsubscribed!" , ts .isUnsubscribed ());
263265 }
266+
267+ @ Test
268+ public void testErrorCauseIncludesLastValue () {
269+ TestSubscriber <String > ts = new TestSubscriber <String >();
270+ Observable .just ("abc" ).takeWhile (new Func1 <String , Boolean >() {
271+ @ Override
272+ public Boolean call (String t1 ) {
273+ throw new TestException ();
274+ }
275+ }).subscribe (ts );
276+
277+ ts .assertTerminalEvent ();
278+ ts .assertNoValues ();
279+ assertTrue (ts .getOnErrorEvents ().get (0 ).getCause ().getMessage ().contains ("abc" ));
280+ }
281+
264282}
You can’t perform that action at this time.
0 commit comments