@@ -21,47 +21,21 @@ import org.scalatest.time.{ Millis, Seconds, Span }
21
21
22
22
import scala .concurrent .Future
23
23
import scala .language .implicitConversions
24
- import scala .util .{ Failure , Success }
25
24
26
25
trait FuturesSpec extends ScalaFutures {
27
26
28
27
implicit val defaultPatience : PatienceConfig = PatienceConfig (timeout = Span (60 , Seconds ), interval = Span (5 , Millis ))
29
28
30
- implicit def observableToFuture [TResult ](observable : Observable [TResult ]): Future [Seq [TResult ]] =
31
- observable.toFuture()
32
-
33
- implicit def observableToFutureConcept [T ](observable : Observable [T ]): FutureConcept [Seq [T ]] = {
34
- val future : Future [Seq [T ]] = observable
35
- new FutureConcept [Seq [T ]] {
36
- def eitherValue : Option [Either [Throwable , Seq [T ]]] = {
37
- future.value.map {
38
- case Success (o) => Right (o)
39
- case Failure (e) => Left (e)
40
- }
41
- }
42
- def isExpired : Boolean = false
29
+ implicit def observableToFuture [T ](observable : Observable [T ]): Future [Seq [T ]] =
30
+ observable.collect().toFuture()
43
31
44
- // Scala Futures themselves don't support the notion of a timeout
45
- def isCanceled : Boolean = false // Scala Futures don't seem to be cancelable either
46
- }
47
- }
48
-
49
- implicit def observableToFuture [TResult ](observable : SingleObservable [TResult ]): Future [TResult ] =
32
+ implicit def singleObservableToFuture [T ](observable : SingleObservable [T ]): Future [T ] =
50
33
observable.toFuture()
51
- implicit def observableToFutureConcept [T ](observable : SingleObservable [T ]): FutureConcept [T ] = {
52
- val future : Future [T ] = observable.toFuture()
53
- new FutureConcept [T ] {
54
- def eitherValue : Option [Either [Throwable , T ]] = {
55
- future.value.map {
56
- case Success (o) => Right (o)
57
- case Failure (e) => Left (e)
58
- }
59
- }
60
- def isExpired : Boolean = false
61
34
62
- // Scala Futures themselves don't support the notion of a timeout
63
- def isCanceled : Boolean = false // Scala Futures don't seem to be cancelable either
64
- }
65
- }
35
+ implicit def observableToFutureConcept [T ](observable : Observable [T ]): FutureConcept [Seq [T ]] =
36
+ convertScalaFuture(observable.collect().toFuture())
37
+
38
+ implicit def singleObservableToFutureConcept [T ](observable : SingleObservable [T ]): FutureConcept [T ] =
39
+ convertScalaFuture(observable.toFuture())
66
40
67
41
}
0 commit comments