Skip to content

Commit 4bda5f4

Browse files
committed
Introduced 'rxjava-scalaz' project. This project provides some type class instances for Observable.
* Monoid: Observable obviously forms a monoid in terms of concatenation. * Functor, Applicative, Monad, MonadPlus: Observable can be a Stream-like Monad and can be MonadPlus as well as Monoid. * Traverse, Foldable: Observable can be Stream-like traversable. __NOTE: The operations for the instance is blocking calls.__ * etc. About Testing, property based tests are applied by Scalaz's ScalaCheck binding. For QuickStart, please refer to rx.java.scala.scalaz.examples.RxScalazDemo.
1 parent d1d75d6 commit 4bda5f4

File tree

12 files changed

+585
-1
lines changed

12 files changed

+585
-1
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# rxjava-scalaz
2+
This provides some useful type class instances for `Observable`. Therefore you can apply scalaz's fancy operators to `Observable`.
3+
4+
Provided type class instances are `Monoid`, `Monad`, `MonadPlus`, `Traverse`, `Foldable`, etc.
5+
6+
For QuickStart, please refer to [RxScalazDemo](./src/test/scala/rx/lang/scala/scalaz/examples/RxScalazDemo.scala).
7+
8+
## How to use
9+
10+
```scala
11+
import scalaz._, Scalaz._
12+
import rx.lang.scala.Observable
13+
import rx.lang.scala.scalaz._
14+
15+
Observable.items(1, 2) |+| Observable.items(3, 4) // == Observable.items(1 2 3 4)
16+
Observable.items(1, 2) {_ + 1} // == Observable.items(2, 3)
17+
(Observable.items(1, 2) |@| Observable.items(3, 4)) {_ + _} // == Observable.items(4, 5, 5, 6)
18+
1.η[Observable] // == Observable.items(1)
19+
(Observable.items(3) >>= {(i: Int) => Observable.items(i + 1)}) // Observable.items(4)
20+
```
21+
22+
Some other useful operators are available. Please see below for details.
23+
24+
## Provided Typeclass Instances
25+
### Monoid
26+
`Observable` obviously forms a monoid interms of [`concat`](https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#concat).
27+
28+
```scala
29+
(Observable.items(1, 2) |+| Observable.items(3, 4)) === Observable.items(1, 2, 3, 4)
30+
(Observable.items(1, 2) Observable.items(3, 4)) === Observable.items(1, 2, 3, 4)
31+
mzero[Observable[Int]] === Observable.empty
32+
```
33+
34+
### Monad, MonadPlus
35+
Essentially, `Observable` is similar to `Stream`. So, `Observable` can be a Stream-like `Monad` and can be a `MonadPlus` as well as `Monoid`. Of course, `Observable` can be also `Functor` and `Applicative`.
36+
37+
```scala
38+
// Functor operators
39+
(Observable.items(1, 2) {_ + 1}) === Observable.items(2, 3)
40+
(Observable.items(1, 2) >| 5) === Observable.items(5, 5)
41+
Observable.items(1, 2).fpair === Observable.items((1, 1), (2, 2))
42+
Observable.items(1, 2).fproduct {_ + 1} === Observable.items((1, 2), (2, 3))
43+
Observable.items(1, 2).strengthL("x") === Observable.items(("x", 1), ("x", 2))
44+
Observable.items(1, 2).strengthR("x") === Observable.items((1, "x"), (2, "x"))
45+
Functor[Observable].lift {(_: Int) + 1}(Observable.items(1, 2)) === Observable.items(2, 3)
46+
47+
// Applicative operators
48+
1.point[Observable] === Observable.items(1)
49+
1.η[Observable] === Observable.items(1)
50+
(Observable.items(1, 2) |@| Observable.items(3, 4)) {_ + _} === Observable.items(4, 5, 5, 6)
51+
(Observable.items(1) <*> {(_: Int) + 1}.η[Observable]) === Observable.items(2)
52+
Observable.items(1) <* Observable.items(2) === Observable.items(1)
53+
Observable.items(1) *> Observable.items(2) === Observable.items(2)
54+
55+
// Monad and MonadPlus operators
56+
(Observable.items(3) >>= {(i: Int) => Observable.items(i + 1)}) === Observable.items(4)
57+
Observable.items(3) >> Observable.items(2) === Observable.items(2)
58+
Observable.items(Observable.items(1, 2), Observable.items(3, 4)).μ === Observable.items(1, 2, 3, 4)
59+
Observable.items(1, 2) <+> Observable.items(3, 4) === Observable.items(1, 2, 3, 4)
60+
```
61+
62+
### Traverse and Foldable
63+
`Observable` can be `Traverse` and `Foldable` as well as `Stream`. This means you can fold `Observable` instance to single value.
64+
65+
```scala
66+
Observable.items(1, 2, 3).foldMap {_.toString} === "123"
67+
Observable.items(1, 2, 3).foldLeftM(0)((acc, v) => (acc * v).some) === 6.some
68+
Observable.items(1, 2, 3).suml === 6
69+
Observable.items(1, 2, 3).(_ > 3) === true
70+
Observable.items(1, 2, 3).(_ > 3) === false
71+
Observable.items(1, 2, 3).traverse(x => (x + 1).some) === Observable.items(2, 3, 4).some
72+
Observable.items(1.some, 2.some).sequence === Observable.items(1, 2).some
73+
```
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
apply plugin: 'scala'
2+
apply plugin: 'osgi'
3+
4+
tasks.withType(ScalaCompile) {
5+
scalaCompileOptions.fork = true
6+
scalaCompileOptions.unchecked = true
7+
scalaCompileOptions.setAdditionalParameters(['-feature'])
8+
9+
configure(scalaCompileOptions.forkOptions) {
10+
memoryMaximumSize = '1g'
11+
jvmArgs = ['-XX:MaxPermSize=512m']
12+
}
13+
}
14+
15+
sourceSets {
16+
main {
17+
scala {
18+
srcDir 'src/main/scala'
19+
}
20+
}
21+
test {
22+
scala {
23+
srcDir 'src/test/scala'
24+
}
25+
}
26+
}
27+
28+
dependencies {
29+
compile project(':rxjava-core')
30+
compile project(':language-adaptors:rxjava-scala')
31+
compile 'org.scalaz:scalaz-core_2.10:7.0.4'
32+
33+
testCompile 'org.scalaz:scalaz-scalacheck-binding_2.10:7.0.4'
34+
testCompile 'org.typelevel:scalaz-specs2_2.10:0.1.2'
35+
testCompile 'junit:junit-dep:4.10'
36+
}
37+
38+
tasks.compileScala {
39+
classpath = classpath + (configurations.compile + configurations.provided)
40+
}
41+
42+
task repl(type: Exec) {
43+
commandLine 'scala', '-cp', sourceSets.test.runtimeClasspath.asPath
44+
standardInput = System.in
45+
}
46+
47+
jar {
48+
manifest {
49+
name = 'rxjava-scalaz'
50+
instruction 'Bundle-Vendor', 'Netflix'
51+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
52+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,!org.scalatest.*,*'
53+
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
54+
}
55+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala
17+
18+
import scala.language.higherKinds
19+
import _root_.scalaz._
20+
import _root_.scalaz.Tags.{Zip => TZip}
21+
import scala.concurrent._
22+
import scala.concurrent.duration._
23+
24+
/**
25+
* This package object provides some type class instances for Observable.
26+
*/
27+
package object scalaz {
28+
29+
// Monoid
30+
implicit def observableMonoid[A] = new Monoid[Observable[A]] {
31+
override def zero: Observable[A] = Observable.empty
32+
override def append(f1: Observable[A], f2: => Observable[A]): Observable[A] = f1 ++ f2
33+
}
34+
35+
implicit val observableInstances = new MonadPlus[Observable] with Zip[Observable]
36+
with IsEmpty[Observable] with Traverse[Observable] {
37+
38+
// Monad
39+
override def point[A](a: => A) = Observable.items(a)
40+
override def bind[A, B](oa: Observable[A])(f: (A) => Observable[B]) = oa.flatMap(f)
41+
42+
// MonadPlus
43+
override def empty[A]: Observable[A] = observableMonoid[A].zero
44+
override def plus[A](a: Observable[A], b: => Observable[A]): Observable[A] = observableMonoid[A].append(a, b)
45+
46+
// Zip
47+
override def zip[A, B](a: => Observable[A], b: => Observable[B]): Observable[(A, B)] = a zip b
48+
49+
// IsEmpty (NOTE: This method is blocking call)
50+
override def isEmpty[A](fa: Observable[A]): Boolean = getOne(fa.isEmpty)
51+
52+
// Traverse (NOTE: This method is blocking call)
53+
override def traverseImpl[G[_], A, B](fa: Observable[A])(f: (A) => G[B])(implicit G: Applicative[G]): G[Observable[B]] = {
54+
val seed: G[Observable[B]] = G.point(Observable.empty)
55+
getOne(fa.foldLeft(seed) {
56+
(ys, x) => G.apply2(ys, f(x))((bs, b) => bs :+ b)
57+
}.head)
58+
}
59+
60+
// This method extracts first elements from Observable.
61+
// NOTE: Make sure that 'o' has at least one element.
62+
private[this] def getOne[T](o: Observable[T]): T = {
63+
val p = Promise[T]
64+
val sub = o.first.subscribe(p.success(_))
65+
try {
66+
Await.result(p.future, Duration.Inf)
67+
} finally {
68+
sub.unsubscribe()
69+
}
70+
}
71+
}
72+
73+
// Observable can be ZipList like applicative functor.
74+
// However, due to https://github.com/scalaz/scalaz/issues/338,
75+
// This instance doesn't have 'implicit' modifier.
76+
val observableZipApplicative: Applicative[({ type λ[α] = Observable[α] @@ TZip })#λ] = new Applicative[({ type λ[α] = Observable[α] @@ TZip })#λ] {
77+
def point[A](a: => A) = TZip(Observable.items(a).repeat)
78+
def ap[A, B](oa: => Observable[A] @@ TZip)(of: => Observable[A => B] @@ TZip) = TZip(of.zip(oa) map {fa => fa._1(fa._2)})
79+
}
80+
81+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala.scalaz
17+
18+
import scalaz._
19+
import rx.lang.scala.Observable
20+
import org.scalacheck.Arbitrary
21+
import scala.concurrent.{Await, Promise}
22+
import scala.concurrent.duration.Duration
23+
24+
/**
25+
* This object provides implicits for tests.
26+
*/
27+
object ImplicitsForTest {
28+
29+
// Equality based on sequenceEqual() method.
30+
implicit def observableEqual[A](implicit eqA: Equal[A]) = new Equal[Observable[A]]{
31+
def equal(a1: Observable[A], a2: Observable[A]) = {
32+
val p = Promise[Boolean]
33+
val sub = a1.sequenceEqual(a2).firstOrElse(false).subscribe(v => p.success(v))
34+
try {
35+
Await.result(p.future, Duration.Inf)
36+
} finally {
37+
sub.unsubscribe()
38+
}
39+
}
40+
}
41+
42+
implicit def observableArbitrary[A](implicit a: Arbitrary[A], array: Arbitrary[Array[A]]): Arbitrary[Observable[A]]
43+
= Arbitrary(for (arr <- array.arbitrary) yield Observable.items(arr:_*))
44+
45+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala.scalaz
17+
18+
import scalaz._
19+
import Scalaz._
20+
import scalaz.scalacheck.ScalazProperties._
21+
import rx.lang.scala.Observable
22+
import org.specs2.scalaz.Spec
23+
import org.specs2.runner.JUnitRunner
24+
import org.junit.runner.RunWith
25+
26+
/**
27+
* Even though Equal[Observable[A]] instance is only for tests.
28+
* However, we should test Equal[Observable[A]] instance,
29+
* Because the result of whole test is based on this instance.
30+
*/
31+
@RunWith(classOf[JUnitRunner])
32+
class ObservableEqualSpec extends Spec{
33+
34+
import rx.lang.scala.scalaz._
35+
import ImplicitsForTest._
36+
37+
"Observable" should {
38+
"satisfies equal laws" in {
39+
checkAll(equal.laws[Observable[Int]])
40+
}
41+
}
42+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala.scalaz
17+
18+
import scalaz._
19+
import Scalaz._
20+
import scalaz.scalacheck.ScalazProperties._
21+
import rx.lang.scala.Observable
22+
import org.specs2.scalaz.Spec
23+
import org.specs2.runner.JUnitRunner
24+
import org.junit.runner.RunWith
25+
26+
@RunWith(classOf[JUnitRunner])
27+
class ObservableIsEmptySpec extends Spec{
28+
29+
import rx.lang.scala.scalaz._
30+
import ImplicitsForTest._
31+
32+
"Observable" should {
33+
"satisfies isEmpty laws" in {
34+
checkAll(isEmpty.laws[Observable])
35+
}
36+
}
37+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala.scalaz
17+
18+
import scalaz._
19+
import Scalaz._
20+
import scalaz.scalacheck.ScalazProperties._
21+
import rx.lang.scala.Observable
22+
import org.specs2.scalaz.Spec
23+
import org.specs2.runner.JUnitRunner
24+
import org.junit.runner.RunWith
25+
26+
@RunWith(classOf[JUnitRunner])
27+
class ObservableMonadSpec extends Spec {
28+
29+
import rx.lang.scala.scalaz._
30+
import ImplicitsForTest._
31+
32+
"Observable" should {
33+
"satisfies monad laws" in {
34+
checkAll(monad.laws[Observable])
35+
}
36+
"satisfies monadplus laws" in {
37+
checkAll(monadPlus.strongLaws[Observable])
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)