Skip to content

Commit 6a402da

Browse files
committed
Merge branch 'master' of https://github.com/Netflix/RxJava into docs
2 parents 77a887b + 03fe25c commit 6a402da

File tree

4 files changed

+221
-45
lines changed

4 files changed

+221
-45
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

100644100755
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,6 +1055,30 @@ class RxScalaDemo extends JUnitSuite {
10551055
}.subscribe(s => println(s), e => e.printStackTrace())
10561056
}
10571057

1058+
@Test def retryWhenExample(): Unit = {
1059+
Observable[String]({ subscriber =>
1060+
println("subscribing")
1061+
subscriber.onError(new RuntimeException("always fails"))
1062+
}).retryWhen(attempts => {
1063+
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
1064+
println("delay retry by " + i + " second(s)")
1065+
Observable.timer(Duration(i, TimeUnit.SECONDS))
1066+
})
1067+
}).toBlocking.foreach(s => println(s))
1068+
}
1069+
1070+
@Test def repeatWhenExample(): Unit = {
1071+
Observable[String]({ subscriber =>
1072+
println("subscribing")
1073+
subscriber.onCompleted()
1074+
}).repeatWhen(attempts => {
1075+
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
1076+
println("delay repeat by " + i + " second(s)")
1077+
Observable.timer(Duration(i, TimeUnit.SECONDS)).materialize
1078+
})
1079+
}, NewThreadScheduler()).toBlocking.foreach(s => println(s))
1080+
}
1081+
10581082
@Test def liftExample1(): Unit = {
10591083
// Add "No. " in front of each item
10601084
val o = List(1, 2, 3).toObservable.lift {

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

100644100755
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3175,6 +3175,94 @@ trait Observable[+T]
31753175
toScalaObservable[T](asJavaObservable.retry(f))
31763176
}
31773177

3178+
/**
3179+
* Returns an Observable that emits the same values as the source observable with the exception of an
3180+
* {@code onError}. An {@code onError} notification from the source will result in the emission of a
3181+
* {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
3182+
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code retry} will call
3183+
* {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
3184+
* resubscribe to the source Observable.
3185+
* <p>
3186+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
3187+
*
3188+
* Example:
3189+
*
3190+
* This retries 3 times, each time incrementing the number of seconds it waits.
3191+
*
3192+
* <pre> {@code
3193+
* Observable[String]({ subscriber =>
3194+
* println("subscribing")
3195+
* subscriber.onError(new RuntimeException("always fails"))
3196+
* }).retryWhen(attempts => {
3197+
* attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
3198+
* println("delay retry by " + i + " second(s)")
3199+
* Observable.timer(Duration(i, TimeUnit.SECONDS))
3200+
* })
3201+
* }).toBlocking.foreach(s => println(s))
3202+
* } </pre>
3203+
*
3204+
* Output is:
3205+
*
3206+
* <pre> {@code
3207+
* subscribing
3208+
* delay retry by 1 second(s)
3209+
* subscribing
3210+
* delay retry by 2 second(s)
3211+
* subscribing
3212+
* delay retry by 3 second(s)
3213+
* subscribing
3214+
* } </pre>
3215+
* <dl>
3216+
* <dt><b>Scheduler:</b></dt>
3217+
* <dd>{@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
3218+
* </dl>
3219+
*
3220+
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
3221+
* retry
3222+
* @return the source Observable modified with retry logic
3223+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
3224+
* @since 0.20
3225+
*/
3226+
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
3227+
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
3228+
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
3229+
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3230+
notificationHandler(on).asJavaObservable
3231+
}
3232+
3233+
toScalaObservable[T](asJavaObservable.retryWhen(f))
3234+
}
3235+
3236+
/**
3237+
* Returns an Observable that emits the same values as the source observable with the exception of an {@code onError}.
3238+
* An onError will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
3239+
* func. If the observable returned {@code onCompletes} or {@code onErrors} then retry will call {@code onCompleted}
3240+
* or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
3241+
* <p>
3242+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
3243+
* <p>
3244+
* <dl>
3245+
* <dt><b>Scheduler:</b></dt>
3246+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
3247+
* </dl>
3248+
*
3249+
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
3250+
* retry
3251+
* @param scheduler the Scheduler on which to subscribe to the source Observable
3252+
* @return the source Observable modified with retry logic
3253+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
3254+
* @since 0.20
3255+
*/
3256+
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]], scheduler: Scheduler): Observable[T] = {
3257+
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
3258+
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
3259+
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3260+
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
3261+
}
3262+
3263+
toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
3264+
}
3265+
31783266
/**
31793267
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
31803268
* <p>
@@ -3237,6 +3325,67 @@ trait Observable[+T]
32373325
toScalaObservable[T](asJavaObservable.repeat(count, scheduler))
32383326
}
32393327

3328+
/**
3329+
* Returns an Observable that emits the same values as the source Observable with the exception of an
3330+
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
3331+
* a {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
3332+
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code repeatWhen} will
3333+
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
3334+
* resubscribe to the source Observable, on a particular Scheduler.
3335+
* <p>
3336+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
3337+
* <dl>
3338+
* <dt><b>Scheduler:</b></dt>
3339+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
3340+
* </dl>
3341+
*
3342+
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
3343+
* @param scheduler the Scheduler to emit the items on
3344+
* @return the source Observable modified with repeat logic
3345+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
3346+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
3347+
* @since 0.20
3348+
*/
3349+
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]], scheduler: Scheduler): Observable[T] = {
3350+
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
3351+
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
3352+
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3353+
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
3354+
}
3355+
3356+
toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
3357+
}
3358+
3359+
/**
3360+
* Returns an Observable that emits the same values as the source Observable with the exception of an
3361+
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
3362+
* a {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
3363+
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code repeatWhen} will
3364+
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
3365+
* resubscribe to the source observable.
3366+
* <p>
3367+
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
3368+
* <dl>
3369+
* <dt><b>Scheduler:</b></dt>
3370+
* <dd>{@code repeatWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
3371+
* </dl>
3372+
*
3373+
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
3374+
* @return the source Observable modified with repeat logic
3375+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
3376+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
3377+
* @since 0.20
3378+
*/
3379+
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]]): Observable[T] = {
3380+
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
3381+
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
3382+
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3383+
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
3384+
}
3385+
3386+
toScalaObservable[T](asJavaObservable.repeatWhen(f))
3387+
}
3388+
32403389
/**
32413390
* Converts an Observable into a [[BlockingObservable]] (an Observable with blocking operators).
32423391
*

rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import java.util.Iterator;
19+
import java.util.NoSuchElementException;
1920

2021
import rx.Observable;
2122
import rx.Subscriber;
@@ -41,54 +42,24 @@ public final class BlockingOperatorMostRecent {
4142
* {@code initialValue} if {@code source} has not yet emitted any items
4243
*/
4344
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {
44-
4545
return new Iterable<T>() {
4646
@Override
4747
public Iterator<T> iterator() {
4848
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
49-
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);
5049

5150
/**
5251
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
5352
* since it is for BlockingObservable.
5453
*/
5554
source.subscribe(mostRecentObserver);
5655

57-
return nextIterator;
56+
return mostRecentObserver.getIterable();
5857
}
5958
};
60-
61-
}
62-
63-
private static class MostRecentIterator<T> implements Iterator<T> {
64-
65-
private final MostRecentObserver<T> observer;
66-
67-
private MostRecentIterator(MostRecentObserver<T> observer) {
68-
this.observer = observer;
69-
}
70-
71-
@Override
72-
public boolean hasNext() {
73-
return !observer.isCompleted();
74-
}
75-
76-
@Override
77-
public T next() {
78-
if (observer.getThrowable() != null) {
79-
throw Exceptions.propagate(observer.getThrowable());
80-
}
81-
return observer.getRecentValue();
82-
}
83-
84-
@Override
85-
public void remove() {
86-
throw new UnsupportedOperationException("Read only iterator");
87-
}
8859
}
8960

9061
private static class MostRecentObserver<T> extends Subscriber<T> {
91-
static final NotificationLite<Object> nl = NotificationLite.instance();
62+
final NotificationLite<T> nl = NotificationLite.instance();
9263
volatile Object value;
9364

9465
private MostRecentObserver(T value) {
@@ -110,19 +81,47 @@ public void onNext(T args) {
11081
value = nl.next(args);
11182
}
11283

113-
private boolean isCompleted() {
114-
return nl.isCompleted(value);
115-
}
116-
117-
private Throwable getThrowable() {
118-
Object v = value;
119-
return nl.isError(v) ? nl.getError(v) : null;
120-
}
121-
122-
@SuppressWarnings("unchecked")
123-
private T getRecentValue() {
124-
return (T)value;
84+
/**
85+
* The {@link Iterator} return is not thread safe. In other words don't call {@link Iterator#hasNext()} in one
86+
* thread expect {@link Iterator#next()} called from a different thread to work.
87+
* @return
88+
*/
89+
public Iterator<T> getIterable() {
90+
return new Iterator<T>() {
91+
/**
92+
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
93+
*/
94+
private Object buf = null;
95+
96+
@Override
97+
public boolean hasNext() {
98+
buf = value;
99+
return !nl.isCompleted(buf);
100+
}
101+
102+
@Override
103+
public T next() {
104+
try {
105+
// if hasNext wasn't called before calling next.
106+
if (buf == null)
107+
buf = value;
108+
if (nl.isCompleted(buf))
109+
throw new NoSuchElementException();
110+
if (nl.isError(buf)) {
111+
throw Exceptions.propagate(nl.getError(buf));
112+
}
113+
return nl.getValue(buf);
114+
}
115+
finally {
116+
buf = null;
117+
}
118+
}
119+
120+
@Override
121+
public void remove() {
122+
throw new UnsupportedOperationException("Read only iterator");
123+
}
124+
};
125125
}
126-
127126
}
128127
}

rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorMostRecentTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
import rx.subjects.Subject;
3535

3636
public class BlockingOperatorMostRecentTest {
37+
@Test
38+
public void testMostRecentNull() {
39+
assertEquals(null, Observable.<Void>never().toBlocking().mostRecent(null).iterator().next());
40+
}
3741

3842
@Test
3943
public void testMostRecent() {

0 commit comments

Comments
 (0)