Skip to content

Commit 6ffe9d7

Browse files
Merge pull request #1297 from everpeace/rxjava-scalaz
[RxScala] rxjava-scalaz: providing some type class instances.
2 parents a0e5cf1 + 6cf9bf9 commit 6ffe9d7

File tree

12 files changed

+571
-1
lines changed

12 files changed

+571
-1
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# rxjava-scalaz
2+
This provides some useful type class instances for `Observable`. Therefore you can apply scalaz's fancy operators to `Observable`.
3+
4+
Provided type class instances are `Monoid`, `Monad`, `MonadPlus`, `Traverse`, `Foldable`, etc.
5+
6+
For QuickStart, please refer to [RxScalazDemo](./src/test/scala/rx/lang/scala/scalaz/examples/RxScalazDemo.scala).
7+
8+
## How to use
9+
10+
```scala
11+
import scalaz._, Scalaz._
12+
import rx.lang.scala.Observable
13+
import rx.lang.scala.scalaz._
14+
15+
Observable.items(1, 2) |+| Observable.items(3, 4) // == Observable.items(1 2 3 4)
16+
Observable.items(1, 2) {_ + 1} // == Observable.items(2, 3)
17+
(Observable.items(1, 2) |@| Observable.items(3, 4)) {_ + _} // == Observable.items(4, 5, 5, 6)
18+
1.η[Observable] // == Observable.items(1)
19+
(Observable.items(3) >>= {(i: Int) => Observable.items(i + 1)}) // Observable.items(4)
20+
```
21+
22+
Some other useful operators are available. Please see below for details.
23+
24+
## Provided Typeclass Instances
25+
### Monoid
26+
`Observable` obviously forms a monoid interms of [`concat`](https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#concat).
27+
28+
```scala
29+
(Observable.items(1, 2) |+| Observable.items(3, 4)) === Observable.items(1, 2, 3, 4)
30+
(Observable.items(1, 2) Observable.items(3, 4)) === Observable.items(1, 2, 3, 4)
31+
mzero[Observable[Int]] === Observable.empty
32+
```
33+
34+
### Monad, MonadPlus
35+
Essentially, `Observable` is similar to `Stream`. So, `Observable` can be a Stream-like `Monad` and can be a `MonadPlus` as well as `Monoid`. Of course, `Observable` can be also `Functor` and `Applicative`.
36+
37+
```scala
38+
// Functor operators
39+
(Observable.items(1, 2) {_ + 1}) === Observable.items(2, 3)
40+
(Observable.items(1, 2) >| 5) === Observable.items(5, 5)
41+
Observable.items(1, 2).fpair === Observable.items((1, 1), (2, 2))
42+
Observable.items(1, 2).fproduct {_ + 1} === Observable.items((1, 2), (2, 3))
43+
Observable.items(1, 2).strengthL("x") === Observable.items(("x", 1), ("x", 2))
44+
Observable.items(1, 2).strengthR("x") === Observable.items((1, "x"), (2, "x"))
45+
Functor[Observable].lift {(_: Int) + 1}(Observable.items(1, 2)) === Observable.items(2, 3)
46+
47+
// Applicative operators
48+
1.point[Observable] === Observable.items(1)
49+
1.η[Observable] === Observable.items(1)
50+
(Observable.items(1, 2) |@| Observable.items(3, 4)) {_ + _} === Observable.items(4, 5, 5, 6)
51+
(Observable.items(1) <*> {(_: Int) + 1}.η[Observable]) === Observable.items(2)
52+
Observable.items(1) <* Observable.items(2) === Observable.items(1)
53+
Observable.items(1) *> Observable.items(2) === Observable.items(2)
54+
55+
// Monad and MonadPlus operators
56+
(Observable.items(3) >>= {(i: Int) => Observable.items(i + 1)}) === Observable.items(4)
57+
Observable.items(3) >> Observable.items(2) === Observable.items(2)
58+
Observable.items(Observable.items(1, 2), Observable.items(3, 4)).μ === Observable.items(1, 2, 3, 4)
59+
Observable.items(1, 2) <+> Observable.items(3, 4) === Observable.items(1, 2, 3, 4)
60+
```
61+
62+
### Traverse and Foldable
63+
`Observable` can be `Traverse` and `Foldable` as well as `Stream`. This means you can fold `Observable` instance to single value.
64+
65+
```scala
66+
Observable.items(1, 2, 3).foldMap {_.toString} === "123"
67+
Observable.items(1, 2, 3).foldLeftM(0)((acc, v) => (acc * v).some) === 6.some
68+
Observable.items(1, 2, 3).suml === 6
69+
Observable.items(1, 2, 3).(_ > 3) === true
70+
Observable.items(1, 2, 3).(_ > 3) === false
71+
Observable.items(1, 2, 3).traverse(x => (x + 1).some) === Observable.items(2, 3, 4).some
72+
Observable.items(1.some, 2.some).sequence === Observable.items(1, 2).some
73+
```
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
apply plugin: 'scala'
2+
apply plugin: 'osgi'
3+
4+
tasks.withType(ScalaCompile) {
5+
scalaCompileOptions.fork = true
6+
scalaCompileOptions.unchecked = true
7+
scalaCompileOptions.setAdditionalParameters(['-feature'])
8+
9+
configure(scalaCompileOptions.forkOptions) {
10+
memoryMaximumSize = '1g'
11+
jvmArgs = ['-XX:MaxPermSize=512m']
12+
}
13+
}
14+
15+
sourceSets {
16+
main {
17+
scala {
18+
srcDir 'src/main/scala'
19+
}
20+
}
21+
test {
22+
scala {
23+
srcDir 'src/test/scala'
24+
}
25+
}
26+
}
27+
28+
dependencies {
29+
compile project(':rxjava-core')
30+
compile project(':language-adaptors:rxjava-scala')
31+
compile 'org.scalaz:scalaz-core_2.10:7.0.4'
32+
33+
testCompile 'org.scalaz:scalaz-scalacheck-binding_2.10:7.0.4'
34+
testCompile 'org.typelevel:scalaz-specs2_2.10:0.1.2'
35+
testCompile 'junit:junit-dep:4.10'
36+
}
37+
38+
tasks.compileScala {
39+
classpath = classpath + (configurations.compile + configurations.provided)
40+
}
41+
42+
task repl(type: Exec) {
43+
commandLine 'scala', '-cp', sourceSets.test.runtimeClasspath.asPath
44+
standardInput = System.in
45+
}
46+
47+
jar {
48+
manifest {
49+
name = 'rxjava-scalaz'
50+
instruction 'Bundle-Vendor', 'Netflix'
51+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
52+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,!org.scalatest.*,*'
53+
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
54+
}
55+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala
17+
18+
import scala.language.higherKinds
19+
import _root_.scalaz._
20+
import _root_.scalaz.Tags.{Zip => TZip}
21+
22+
/**
23+
* This package object provides some type class instances for Observable.
24+
*/
25+
package object scalaz {
26+
27+
// Monoid
28+
implicit def observableMonoid[A] = new Monoid[Observable[A]] {
29+
override def zero: Observable[A] = Observable.empty
30+
override def append(f1: Observable[A], f2: => Observable[A]): Observable[A] = f1 ++ f2
31+
}
32+
33+
implicit val observableInstances = new MonadPlus[Observable] with Zip[Observable]
34+
with IsEmpty[Observable] with Traverse[Observable] {
35+
36+
// Monad
37+
override def point[A](a: => A) = Observable.items(a)
38+
override def bind[A, B](oa: Observable[A])(f: (A) => Observable[B]) = oa.flatMap(f)
39+
40+
// MonadPlus
41+
override def empty[A]: Observable[A] = observableMonoid[A].zero
42+
override def plus[A](a: Observable[A], b: => Observable[A]): Observable[A] = observableMonoid[A].append(a, b)
43+
44+
// Zip
45+
override def zip[A, B](a: => Observable[A], b: => Observable[B]): Observable[(A, B)] = a zip b
46+
47+
// IsEmpty (NOTE: This method is blocking call)
48+
override def isEmpty[A](fa: Observable[A]): Boolean = fa.isEmpty.toBlocking.first
49+
50+
// Traverse (NOTE: This method is blocking call)
51+
override def traverseImpl[G[_], A, B](fa: Observable[A])(f: (A) => G[B])(implicit G: Applicative[G]): G[Observable[B]] = {
52+
val seed: G[Observable[B]] = G.point(Observable.empty)
53+
fa.foldLeft(seed) {
54+
(ys, x) => G.apply2(ys, f(x))((bs, b) => bs :+ b)
55+
}.toBlocking.first
56+
}
57+
}
58+
59+
// Observable can be ZipList like applicative functor.
60+
// However, due to https://github.com/scalaz/scalaz/issues/338,
61+
// This instance doesn't have 'implicit' modifier.
62+
val observableZipApplicative: Applicative[({ type λ[α] = Observable[α] @@ TZip })#λ] = new Applicative[({ type λ[α] = Observable[α] @@ TZip })#λ] {
63+
def point[A](a: => A) = TZip(Observable.items(a).repeat)
64+
def ap[A, B](oa: => Observable[A] @@ TZip)(of: => Observable[A => B] @@ TZip) = TZip(of.zip(oa) map {fa => fa._1(fa._2)})
65+
}
66+
67+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala.scalaz
17+
18+
import scalaz._
19+
import rx.lang.scala.Observable
20+
import org.scalacheck.Arbitrary
21+
import scala.concurrent.{Await, Promise}
22+
import scala.concurrent.duration.Duration
23+
24+
/**
25+
* This object provides implicits for tests.
26+
*/
27+
object ImplicitsForTest {
28+
29+
// Equality based on sequenceEqual() method.
30+
implicit def observableEqual[A](implicit eqA: Equal[A]) = new Equal[Observable[A]]{
31+
def equal(a1: Observable[A], a2: Observable[A]) = {
32+
val p = Promise[Boolean]
33+
val sub = a1.sequenceEqual(a2).firstOrElse(false).subscribe(v => p.success(v))
34+
try {
35+
Await.result(p.future, Duration.Inf)
36+
} finally {
37+
sub.unsubscribe()
38+
}
39+
}
40+
}
41+
42+
implicit def observableArbitrary[A](implicit a: Arbitrary[A], array: Arbitrary[Array[A]]): Arbitrary[Observable[A]]
43+
= Arbitrary(for (arr <- array.arbitrary) yield Observable.items(arr:_*))
44+
45+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala.scalaz
17+
18+
import scalaz._
19+
import Scalaz._
20+
import scalaz.scalacheck.ScalazProperties._
21+
import rx.lang.scala.Observable
22+
import org.specs2.scalaz.Spec
23+
import org.specs2.runner.JUnitRunner
24+
import org.junit.runner.RunWith
25+
26+
/**
27+
* Even though Equal[Observable[A]] instance is only for tests.
28+
* However, we should test Equal[Observable[A]] instance,
29+
* Because the result of whole test is based on this instance.
30+
*/
31+
@RunWith(classOf[JUnitRunner])
32+
class ObservableEqualSpec extends Spec{
33+
34+
import rx.lang.scala.scalaz._
35+
import ImplicitsForTest._
36+
37+
"Observable" should {
38+
"satisfies equal laws" in {
39+
checkAll(equal.laws[Observable[Int]])
40+
}
41+
}
42+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala.scalaz
17+
18+
import scalaz._
19+
import Scalaz._
20+
import scalaz.scalacheck.ScalazProperties._
21+
import rx.lang.scala.Observable
22+
import org.specs2.scalaz.Spec
23+
import org.specs2.runner.JUnitRunner
24+
import org.junit.runner.RunWith
25+
26+
@RunWith(classOf[JUnitRunner])
27+
class ObservableIsEmptySpec extends Spec{
28+
29+
import rx.lang.scala.scalaz._
30+
import ImplicitsForTest._
31+
32+
"Observable" should {
33+
"satisfies isEmpty laws" in {
34+
checkAll(isEmpty.laws[Observable])
35+
}
36+
}
37+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala.scalaz
17+
18+
import scalaz._
19+
import Scalaz._
20+
import scalaz.scalacheck.ScalazProperties._
21+
import rx.lang.scala.Observable
22+
import org.specs2.scalaz.Spec
23+
import org.specs2.runner.JUnitRunner
24+
import org.junit.runner.RunWith
25+
26+
@RunWith(classOf[JUnitRunner])
27+
class ObservableMonadSpec extends Spec {
28+
29+
import rx.lang.scala.scalaz._
30+
import ImplicitsForTest._
31+
32+
"Observable" should {
33+
"satisfies monad laws" in {
34+
checkAll(monad.laws[Observable])
35+
}
36+
"satisfies monadplus laws" in {
37+
checkAll(monadPlus.strongLaws[Observable])
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)