Skip to content

Commit 8a3e4db

Browse files
committed
Scala Observables fix (#787)
Should handle errors from user functions gracefully JAVA-4304
1 parent 93ad7b6 commit 8a3e4db

File tree

6 files changed

+101
-15
lines changed

6 files changed

+101
-15
lines changed

driver-scala/src/main/scala/org/mongodb/scala/internal/FilterObservable.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.mongodb.scala.internal
1818

1919
import org.mongodb.scala.{ Observable, Observer, Subscription }
2020

21+
import scala.util.{ Failure, Success, Try }
22+
2123
private[scala] case class FilterObservable[T](observable: Observable[T], p: T => Boolean) extends Observable[T] {
2224
override def subscribe(observer: Observer[_ >: T]): Unit = {
2325
observable.subscribe(
@@ -43,10 +45,14 @@ private[scala] case class FilterObservable[T](observable: Observable[T], p: T =>
4345
}
4446

4547
override def onNext(tResult: T): Unit = {
46-
if (p(tResult)) {
47-
observer.onNext(tResult)
48-
} else if (!terminated) {
49-
subscription.foreach(_.request(1)) // No match, request more from down stream
48+
Try(p(tResult)) match {
49+
case Success(result) =>
50+
if (result) {
51+
observer.onNext(tResult)
52+
} else if (!terminated) {
53+
subscription.foreach(_.request(1)) // No match, request more from down stream
54+
}
55+
case Failure(exception) => onError(exception)
5056
}
5157
}
5258
}

driver-scala/src/main/scala/org/mongodb/scala/internal/FlatMapObservable.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.mongodb.scala.internal
1919
import org.mongodb.scala._
2020

2121
import java.util.concurrent.atomic.AtomicReference
22+
import scala.util.{ Failure, Success, Try }
2223

2324
sealed trait State
2425
case object Init extends State
@@ -78,11 +79,19 @@ private[scala] case class FlatMapObservable[T, S](observable: Observable[T], f:
7879
}
7980

8081
override def onError(throwable: Throwable): Unit = {
82+
state.set(Error)
8183
observer.onError(throwable)
8284
}
8385

8486
override def onNext(tResult: T): Unit = {
85-
f(tResult).subscribe(
87+
Try(f(tResult)) match {
88+
case Success(result) => onNextMappedObservable(result);
89+
case Failure(exception) => onError(exception)
90+
}
91+
}
92+
93+
private def onNextMappedObservable(mappedObservable: Observable[S]): Unit = {
94+
mappedObservable.subscribe(
8695
new Observer[S]() {
8796
override def onError(throwable: Throwable): Unit = {
8897
state.set(Error)

driver-scala/src/main/scala/org/mongodb/scala/internal/FoldLeftObservable.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
package org.mongodb.scala.internal
1818

1919
import java.util.concurrent.atomic.AtomicBoolean
20-
2120
import org.mongodb.scala.{ Observable, Observer, SingleObservable, Subscription }
2221

22+
import scala.util.{ Failure, Success, Try }
23+
2324
private[scala] case class FoldLeftObservable[T, S](observable: Observable[T], initialValue: S, accumulator: (S, T) => S)
2425
extends SingleObservable[S] {
2526

@@ -55,7 +56,10 @@ private[scala] case class FoldLeftObservable[T, S](observable: Observable[T], in
5556
}
5657

5758
override def onNext(tResult: T): Unit = {
58-
currentValue = accumulator(currentValue, tResult)
59+
Try(accumulator(currentValue, tResult)) match {
60+
case Success(result) => currentValue = result;
61+
case Failure(exception) => onError(exception)
62+
}
5963
}
6064
}
6165
)

driver-scala/src/main/scala/org/mongodb/scala/internal/MapObservable.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.mongodb.scala.internal
1818

1919
import org.mongodb.scala.{ Observable, Observer, Subscription }
2020

21+
import scala.util.{ Failure, Success, Try }
22+
2123
private[scala] case class MapObservable[T, S](observable: Observable[T], s: T => S, f: Throwable => Throwable = t => t)
2224
extends Observable[S] {
2325
override def subscribe(observer: Observer[_ >: S]): Unit = {
@@ -30,7 +32,12 @@ private[scala] case class MapObservable[T, S](observable: Observable[T], s: T =>
3032

3133
override def onComplete(): Unit = observer.onComplete()
3234

33-
override def onNext(tResult: T): Unit = observer.onNext(s(tResult))
35+
override def onNext(tResult: T): Unit = {
36+
Try(s(tResult)) match {
37+
case Success(result) => observer.onNext(result);
38+
case Failure(exception) => observer.onError(exception)
39+
}
40+
}
3441
}
3542
)
3643
)

driver-scala/src/test/scala/org/mongodb/scala/internal/FlatMapObservableTest.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,36 @@ class FlatMapObservableTest extends BaseSpec with Futures with Eventually {
3535
_ => (),
3636
e => p.failure(e),
3737
() => {
38-
completedCounter.incrementAndGet()
39-
Thread.sleep(100)
4038
p.trySuccess(())
39+
completedCounter.incrementAndGet()
4140
}
4241
)
4342
eventually(assert(completedCounter.get() == 1, s"${completedCounter.get()}"))
44-
Thread.sleep(200)
45-
assert(completedCounter.get() == 1, s"${completedCounter.get()}")
46-
Thread.sleep(1000)
43+
}
44+
45+
it should "call onError if the mapper fails" in {
46+
val p = Promise[Unit]()
47+
val errorCounter = new AtomicInteger(0)
48+
Observable(1 to 100)
49+
.flatMap(
50+
x =>
51+
if (x > 10) {
52+
throw new IllegalStateException("Fail")
53+
} else {
54+
createObservable(x)
55+
}
56+
)
57+
.subscribe(
58+
_ => (),
59+
_ => {
60+
p.trySuccess()
61+
errorCounter.incrementAndGet()
62+
},
63+
() => {
64+
p.failure(new IllegalStateException("Should not complete"))
65+
}
66+
)
67+
eventually(assert(errorCounter.get() == 1, s"${errorCounter.get()}"))
4768
}
4869

4970
private def createObservable(x: Int): Observable[Int] = new Observable[Int] {

driver-scala/src/test/scala/org/mongodb/scala/internal/ObservableImplementationSpec.scala

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,17 @@ class ObservableImplementationSpec extends BaseSpec with TableDrivenPropertyChec
5959
forAll(failingObservables) { (observable: Observable[Int]) =>
6060
{
6161
var thrown = false
62-
observable.subscribe((res: Int) => (), (t: Throwable) => thrown = true)
62+
observable.subscribe(_ => (), _ => thrown = true)
63+
thrown should equal(true)
64+
}
65+
}
66+
}
67+
68+
it should "be well behaved when errors are caused by passed in function and call onError" in {
69+
forAll(failingFunctionsObservables) { (observable: Observable[Int]) =>
70+
{
71+
var thrown = false
72+
observable.subscribe(_ => (), _ => thrown = true)
6373
thrown should equal(true)
6474
}
6575
}
@@ -208,7 +218,7 @@ class ObservableImplementationSpec extends BaseSpec with TableDrivenPropertyChec
208218

209219
val failOn = 30
210220

211-
def failingObservables =
221+
private def failingObservables =
212222
Table(
213223
"observable",
214224
TestObservable[Int](failOn = failOn),
@@ -231,6 +241,35 @@ class ObservableImplementationSpec extends BaseSpec with TableDrivenPropertyChec
231241
ZipObservable[Int, Int](TestObservable[Int](failOn = failOn), TestObservable[Int]()).map[Int](a => a._1)
232242
)
233243

244+
private def failingFunctionsObservables =
245+
Table(
246+
"observable",
247+
FilterObservable[Int](TestObservable[Int](), (i: Int) => {
248+
if (i > 10) {
249+
throw new RuntimeException("Error")
250+
}
251+
i % 2 == 0
252+
}),
253+
FlatMapObservable[Int, Int](TestObservable[Int](), (i: Int) => {
254+
if (i > 10) {
255+
throw new RuntimeException("Error")
256+
}
257+
TestObservable[Int](1 to 2)
258+
}),
259+
FoldLeftObservable(TestObservable[Int](1 to 100), 0, (v: Int, i: Int) => {
260+
if (i > 10) {
261+
throw new RuntimeException("Error")
262+
}
263+
v + i
264+
}),
265+
MapObservable[Int, Int](TestObservable[Int](), (i: Int) => {
266+
if (i > 10) {
267+
throw new RuntimeException("Error")
268+
}
269+
i * 100
270+
})
271+
)
272+
234273
private def happyObservables =
235274
Table(
236275
("observable", "observer"),

0 commit comments

Comments
 (0)