@@ -18,8 +18,6 @@ package rx.lang.scala
18
18
import scala .language .higherKinds
19
19
import _root_ .scalaz ._
20
20
import _root_ .scalaz .Tags .{Zip => TZip }
21
- import scala .concurrent ._
22
- import scala .concurrent .duration ._
23
21
24
22
/**
25
23
* This package object provides some type class instances for Observable.
@@ -47,26 +45,14 @@ package object scalaz {
47
45
override def zip [A , B ](a : => Observable [A ], b : => Observable [B ]): Observable [(A , B )] = a zip b
48
46
49
47
// IsEmpty (NOTE: This method is blocking call)
50
- override def isEmpty [A ](fa : Observable [A ]): Boolean = getOne( fa.isEmpty)
48
+ override def isEmpty [A ](fa : Observable [A ]): Boolean = fa.isEmpty.toBlocking.first
51
49
52
50
// Traverse (NOTE: This method is blocking call)
53
51
override def traverseImpl [G [_], A , B ](fa : Observable [A ])(f : (A ) => G [B ])(implicit G : Applicative [G ]): G [Observable [B ]] = {
54
52
val seed : G [Observable [B ]] = G .point(Observable .empty)
55
- getOne( fa.foldLeft(seed) {
53
+ fa.foldLeft(seed) {
56
54
(ys, x) => G .apply2(ys, f(x))((bs, b) => bs :+ b)
57
- }.head)
58
- }
59
-
60
- // This method extracts first elements from Observable.
61
- // NOTE: Make sure that 'o' has at least one element.
62
- private [this ] def getOne [T ](o : Observable [T ]): T = {
63
- val p = Promise [T ]
64
- val sub = o.first.subscribe(p.success(_))
65
- try {
66
- Await .result(p.future, Duration .Inf )
67
- } finally {
68
- sub.unsubscribe()
69
- }
55
+ }.toBlocking.first
70
56
}
71
57
}
72
58
0 commit comments