Skip to content

Commit ed10ea6

Browse files
committed
Add onErrorFlatMap to RxScala
1 parent ea37e72 commit ed10ea6

File tree

3 files changed

+59
-0
lines changed

3 files changed

+59
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,4 +1176,38 @@ class RxScalaDemo extends JUnitSuite {
11761176
.take(20)
11771177
.toBlocking.foreach(println)
11781178
}
1179+
1180+
@Test def onErrorResumeNextExample() {
1181+
val o = Observable {
1182+
(subscriber: Subscriber[Int]) =>
1183+
subscriber.onNext(1)
1184+
subscriber.onNext(2)
1185+
subscriber.onError(new IOException("Oops"))
1186+
subscriber.onNext(3)
1187+
subscriber.onNext(4)
1188+
}
1189+
o.onErrorResumeNext(_ => Observable.items(10, 11, 12)).subscribe(println(_))
1190+
}
1191+
1192+
@Test def onErrorFlatMapExample() {
1193+
val o = Observable {
1194+
(subscriber: Subscriber[Int]) =>
1195+
subscriber.onNext(1)
1196+
subscriber.onNext(2)
1197+
subscriber.onError(new IOException("Oops"))
1198+
subscriber.onNext(3)
1199+
subscriber.onNext(4)
1200+
}
1201+
o.onErrorFlatMap((_, _) => Observable.items(10, 11, 12)).subscribe(println(_))
1202+
}
1203+
1204+
@Test def onErrorFlatMapExample2() {
1205+
val o = Observable.items(4, 2, 0).map(16 / _).onErrorFlatMap {
1206+
(e, op) => op match {
1207+
case Some(v) if v == 0 => Observable.items(Int.MinValue)
1208+
case _ => Observable.empty
1209+
}
1210+
}
1211+
o.subscribe(println(_))
1212+
}
11791213
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,30 @@ trait Observable[+T]
12081208
toScalaObservable[U](asJavaObservable.onErrorReturn(f2))
12091209
}
12101210

1211+
/**
1212+
* Intercepts `onError` notifications from the source Observable and replaces them with the
1213+
* `onNext` emissions of an Observable returned by a specified function. This allows the source
1214+
* sequence to continue even if it issues multiple `onError` notifications.
1215+
*
1216+
* <img width="640" height="310" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/onErrorFlatMap.png">
1217+
*
1218+
* @param resumeFunction a function that accepts an `Throwable` and an `Option` associated with this error representing
1219+
* the Throwable issued by the source Observable, and returns an Observable that emits items
1220+
* that will be emitted in place of the error. If no value is associated with the error, the value
1221+
* will be `None`.
1222+
* @return the original Observable, with appropriately modified behavior
1223+
*/
1224+
def onErrorFlatMap[U >: T](resumeFunction: (Throwable, Option[Any]) => Observable[U]): Observable[U] = {
1225+
val f = new Func1[rx.exceptions.OnErrorThrowable, rx.Observable[_ <: U]] {
1226+
override def call(t: rx.exceptions.OnErrorThrowable): rx.Observable[_ <: U] = {
1227+
val v = if (t.isValueNull) Some(t.getValue) else None
1228+
resumeFunction(t.getCause, v).asJavaObservable
1229+
}
1230+
}
1231+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[U]]
1232+
toScalaObservable[U](thisJava.onErrorFlatMap(f))
1233+
}
1234+
12111235
/**
12121236
* Returns an Observable that applies a function of your choosing to the first item emitted by a
12131237
* source Observable, then feeds the result of that function along with the second item emitted

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class CompletenessTest extends JUnitSuite {
106106
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
107107
"onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)",
108108
"onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])",
109+
"onErrorFlatMap(Func1[OnErrorThrowable, _ <: Observable[_ <: T]])" -> "onErrorFlatMap((Throwable, Option[Any]) => Observable[U])",
109110
"parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])",
110111
"parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)",
111112
"publish(T)" -> "publish(U)",

0 commit comments

Comments
 (0)