2424import static org .mockito .Mockito .never ;
2525import static org .mockito .Mockito .times ;
2626
27- import java .util .concurrent .CountDownLatch ;
28- import java .util .concurrent .TimeUnit ;
29- import java .util .concurrent .atomic .AtomicBoolean ;
30- import java .util .concurrent .atomic .AtomicInteger ;
31-
3227import org .junit .Test ;
3328import org .mockito .InOrder ;
3429import org .mockito .Mockito ;
3530
3631import rx .Observable ;
3732import rx .Observable .OnSubscribe ;
38- import rx .Notification ;
3933import rx .Observer ;
4034import rx .Subscriber ;
4135import rx .Subscription ;
4943import rx .subjects .PublishSubject ;
5044import rx .subscriptions .Subscriptions ;
5145
46+ import java .util .concurrent .CountDownLatch ;
47+ import java .util .concurrent .TimeUnit ;
48+ import java .util .concurrent .atomic .AtomicBoolean ;
49+ import java .util .concurrent .atomic .AtomicInteger ;
50+
5251public class OperatorRetryTest {
5352
5453 @ Test
@@ -73,15 +72,15 @@ public void call(Subscriber<? super String> t1) {
7372
7473 });
7574 TestSubscriber <String > ts = new TestSubscriber <String >(consumer );
76- producer .retryWhen (new Func1 <Observable <? extends Notification <?> >, Observable <?>>() {
75+ producer .retryWhen (new Func1 <Observable <? extends Throwable >, Observable <?>>() {
7776
7877 @ Override
79- public Observable <?> call (Observable <? extends Notification <?> > attempts ) {
78+ public Observable <?> call (Observable <? extends Throwable > attempts ) {
8079 // Worker w = Schedulers.computation().createWorker();
8180 return attempts
82- .map (new Func1 <Notification <?> , Tuple >() {
81+ .map (new Func1 <Throwable , Tuple >() {
8382 @ Override
84- public Tuple call (Notification <?> n ) {
83+ public Tuple call (Throwable n ) {
8584 return new Tuple (new Long (1 ), n );
8685 }})
8786 .scan (new Func2 <Tuple , Tuple , Tuple >(){
@@ -94,7 +93,7 @@ public Tuple call(Tuple t, Tuple n) {
9493 public Observable <Long > call (Tuple t ) {
9594 System .out .println ("Retry # " +t .count );
9695 return t .count > 20 ?
97- Observable .<Long >error (t .n . getThrowable () ) :
96+ Observable .<Long >error (t .n ) :
9897 Observable .timer (t .count *1L , TimeUnit .MILLISECONDS );
9998 }});
10099 }
@@ -112,9 +111,9 @@ public Observable<Long> call(Tuple t) {
112111
113112 public static class Tuple {
114113 Long count ;
115- Notification <?> n ;
114+ Throwable n ;
116115
117- Tuple (Long c , Notification <?> n ) {
116+ Tuple (Long c , Throwable n ) {
118117 count = c ;
119118 this .n = n ;
120119 }
@@ -147,15 +146,15 @@ public void testSchedulingNotificationHandler() {
147146 int NUM_RETRIES = 2 ;
148147 Observable <String > origin = Observable .create (new FuncWithErrors (NUM_RETRIES ));
149148 TestSubscriber <String > subscriber = new TestSubscriber <String >(observer );
150- origin .retryWhen (new Func1 <Observable <? extends Notification <?>> , Observable <? extends Notification <?> >>() {
149+ origin .retryWhen (new Func1 <Observable <? extends Throwable > , Observable <?>>() {
151150 @ Override
152- public Observable <? extends Notification <?>> call (Observable <? extends Notification <?> > t1 ) {
153- return t1 .observeOn (Schedulers .computation ()).map (new Func1 <Notification <?>, Notification <?> >() {
151+ public Observable <?> call (Observable <? extends Throwable > t1 ) {
152+ return t1 .observeOn (Schedulers .computation ()).map (new Func1 <Throwable , Void >() {
154153 @ Override
155- public Notification <?> call (Notification <?> t1 ) {
156- return Notification . createOnNext ( null ) ;
154+ public Void call (Throwable t1 ) {
155+ return null ;
157156 }
158- }).startWith (Notification . createOnNext ( null ) );
157+ }).startWith (( Void ) null );
159158 }
160159 }).subscribe (subscriber );
161160
@@ -178,16 +177,16 @@ public void testOnNextFromNotificationHandler() {
178177 Observer <String > observer = mock (Observer .class );
179178 int NUM_RETRIES = 2 ;
180179 Observable <String > origin = Observable .create (new FuncWithErrors (NUM_RETRIES ));
181- origin .retryWhen (new Func1 <Observable <? extends Notification <?>> , Observable <? extends Notification <?> >>() {
180+ origin .retryWhen (new Func1 <Observable <? extends Throwable > , Observable <?>>() {
182181 @ Override
183- public Observable <? extends Notification <?>> call (Observable <? extends Notification <?> > t1 ) {
184- return t1 .map (new Func1 <Notification <?>, Notification <?> >() {
182+ public Observable <?> call (Observable <? extends Throwable > t1 ) {
183+ return t1 .map (new Func1 <Throwable , Void >() {
185184
186185 @ Override
187- public Notification <?> call (Notification <?> t1 ) {
188- return Notification . createOnNext ( null ) ;
186+ public Void call (Throwable t1 ) {
187+ return null ;
189188 }
190- }).startWith (Notification . createOnNext ( null ) );
189+ }).startWith (( Void ) null );
191190 }
192191 }).subscribe (observer );
193192
@@ -209,9 +208,9 @@ public void testOnCompletedFromNotificationHandler() {
209208 Observer <String > observer = mock (Observer .class );
210209 Observable <String > origin = Observable .create (new FuncWithErrors (1 ));
211210 TestSubscriber <String > subscriber = new TestSubscriber <String >(observer );
212- origin .retryWhen (new Func1 <Observable <? extends Notification <?>> , Observable <? extends Notification <?> >>() {
211+ origin .retryWhen (new Func1 <Observable <? extends Throwable > , Observable <?>>() {
213212 @ Override
214- public Observable <? extends Notification <?>> call (Observable <? extends Notification <?> > t1 ) {
213+ public Observable <?> call (Observable <? extends Throwable > t1 ) {
215214 return Observable .empty ();
216215 }
217216 }).subscribe (subscriber );
@@ -229,9 +228,9 @@ public void testOnErrorFromNotificationHandler() {
229228 @ SuppressWarnings ("unchecked" )
230229 Observer <String > observer = mock (Observer .class );
231230 Observable <String > origin = Observable .create (new FuncWithErrors (2 ));
232- origin .retryWhen (new Func1 <Observable <? extends Notification <?>> , Observable <? extends Notification <?> >>() {
231+ origin .retryWhen (new Func1 <Observable <? extends Throwable > , Observable <?>>() {
233232 @ Override
234- public Observable <? extends Notification <?>> call (Observable <? extends Notification <?> > t1 ) {
233+ public Observable <?> call (Observable <? extends Throwable > t1 ) {
235234 return Observable .error (new RuntimeException ());
236235 }
237236 }).subscribe (observer );
0 commit comments