Skip to content

Commit 7ea8e4f

Browse files
committed
Merge branch 'master' into cachedthreadscheduler2
2 parents 538d245 + e8afd05 commit 7ea8e4f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1893
-1216
lines changed

language-adaptors/rxjava-scala/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ This adaptor allows to use RxJava in Scala with anonymous functions, e.g.
55
```scala
66
val o = Observable.interval(200 millis).take(5)
77
o.subscribe(n => println("n = " + n))
8-
Observable(1, 2, 3, 4).reduce(_ + _)
8+
Observable.items(1, 2, 3, 4).reduce(_ + _)
99
```
1010

1111
For-comprehensions are also supported:
1212

1313
```scala
14-
val first = Observable(10, 11, 12)
15-
val second = Observable(10, 11, 12)
14+
val first = Observable.items(10, 11, 12)
15+
val second = Observable.items(10, 11, 12)
1616
val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)
1717
```
1818

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,31 @@ class RxScalaDemo extends JUnitSuite {
448448
.toBlockingObservable.foreach(println(_))
449449
}
450450

451+
/**
452+
* This is a bad way of using `zip` with an `Iterable`: even if the consumer unsubscribes,
453+
* some elements may still be pulled from `Iterable`.
454+
*/
455+
@Test def zipWithIterableBadExample() {
456+
val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3)
457+
val o2 = Observable.from(0 until Int.MaxValue).doOnEach(i => println(i + " from o2"))
458+
o1.zip(o2).toBlockingObservable.foreach(println(_))
459+
}
460+
461+
/**
462+
* This is a good way of using `zip` with an `Iterable`: if the consumer unsubscribes,
463+
* no more elements will be pulled from `Iterable`.
464+
*/
465+
@Test def zipWithIterableGoodExample() {
466+
val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3)
467+
val iter = (0 until Int.MaxValue).view.map {
468+
i => {
469+
println(i + " from iter")
470+
i
471+
}
472+
}
473+
o1.zip(iter).toBlockingObservable.foreach(println(_))
474+
}
475+
451476
@Test def takeFirstWithCondition() {
452477
val condition: Int => Boolean = _ >= 3
453478
assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlockingObservable.single)
@@ -620,6 +645,34 @@ class RxScalaDemo extends JUnitSuite {
620645
o.take(1).subscribe(println(_))
621646
}
622647

648+
@Test def createExampleGood2() {
649+
import scala.io.{Codec, Source}
650+
651+
val rxscala = Observable[String](subscriber => {
652+
try {
653+
val input = new java.net.URL("http://rxscala.github.io/").openStream()
654+
subscriber.add(Subscription {
655+
input.close()
656+
})
657+
Source.fromInputStream(input)(Codec.UTF8).getLines()
658+
.takeWhile(_ => !subscriber.isUnsubscribed)
659+
.foreach(subscriber.onNext(_))
660+
if (!subscriber.isUnsubscribed) {
661+
subscriber.onCompleted()
662+
}
663+
}
664+
catch {
665+
case e: Throwable => if (!subscriber.isUnsubscribed) subscriber.onError(e)
666+
}
667+
}).subscribeOn(IOScheduler())
668+
669+
val count = rxscala.flatMap(_.split("\\W+").toSeq.toObservable)
670+
.map(_.toLowerCase)
671+
.filter(_ == "rxscala")
672+
.size
673+
println(s"RxScala appears ${count.toBlockingObservable.single} times in http://rxscala.github.io/")
674+
}
675+
623676
def output(s: String): Unit = println(s)
624677

625678
/** Subscribes to obs and waits until obs has completed. Note that if you subscribe to

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,42 @@ trait Observable[+T]
336336
zipWith(that, (t: T, u: U) => (t, u))
337337
}
338338

339+
/**
340+
* Returns an Observable formed from `this` Observable and `other` Iterable by combining
341+
* corresponding elements in pairs.
342+
* <p>
343+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.i.png">
344+
* <p>
345+
* Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
346+
* not pre-consumed. This allows you to zip infinite streams on either side.
347+
*
348+
* @param other the Iterable sequence
349+
* @return an Observable that pairs up values from the source Observable and the `other` Iterable.
350+
*/
351+
def zip[U](other: Iterable[U]): Observable[(T, U)] = {
352+
zipWith(other, (t: T, u: U) => (t, u))
353+
}
354+
355+
/**
356+
* Returns an Observable that emits items that are the result of applying a specified function to pairs of
357+
* values, one each from the source Observable and a specified Iterable sequence.
358+
* <p>
359+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.i.png">
360+
* <p>
361+
* Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
362+
* not pre-consumed. This allows you to zip infinite streams on either side.
363+
*
364+
* @param other the Iterable sequence
365+
* @param selector a function that combines the pairs of items from the Observable and the Iterable to generate
366+
* the items to be emitted by the resulting Observable
367+
* @return an Observable that pairs up values from the source Observable and the `other` Iterable
368+
* sequence and emits the results of `selector` applied to these pairs
369+
*/
370+
def zipWith[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = {
371+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
372+
toScalaObservable[R](thisJava.zip(other.asJava, selector))
373+
}
374+
339375
/**
340376
* Returns an Observable formed from this Observable and another Observable by combining
341377
* corresponding elements using the selector function.
@@ -353,7 +389,7 @@ trait Observable[+T]
353389
* their index. Indices start at 0.
354390
*/
355391
def zipWithIndex: Observable[(T, Int)] = {
356-
zip((0 until Int.MaxValue).toObservable)
392+
zip(0 until Int.MaxValue)
357393
}
358394

359395
/**
@@ -3330,10 +3366,19 @@ object Observable {
33303366
* Write the function you pass so that it behaves as an Observable: It should invoke the
33313367
* Subscriber's `onNext`, `onError`, and `onCompleted` methods appropriately.
33323368
*
3369+
* You can `add` custom [[Subscription]]s to [[Subscriber]]. These [[Subscription]]s will be called
3370+
* <ul>
3371+
* <li>when someone calls `unsubscribe`.</li>
3372+
* <li>after `onCompleted` or `onError`.</li>
3373+
* </ul>
3374+
*
33333375
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
33343376
* information.
33353377
*
3336-
* @tparam T
3378+
* See `<a href="https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala">RxScalaDemo</a>.createExampleGood`
3379+
* and `<a href="https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala">RxScalaDemo</a>.createExampleGood2`.
3380+
*
3381+
* @param T
33373382
* the type of the items that this Observable emits
33383383
* @param f
33393384
* a function that accepts a `Subscriber[T]`, and invokes its `onNext`,

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ class CompletenessTest extends JUnitSuite {
8787
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
8888
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
8989
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
90+
"ignoreElements()" -> "[use `filter(_ => false)`]",
9091
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
92+
"limit(Int)" -> "take(Int)",
9193
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
9294
"multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])",
9395
"multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])",
@@ -118,6 +120,7 @@ class CompletenessTest extends JUnitSuite {
118120
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
119121
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
120122
"skipUntil(Observable[U])" -> "dropUntil(Observable[E])",
123+
"single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]",
121124
"startWith(T)" -> "[use `item +: o`]",
122125
"startWith(Array[T])" -> "[use `Observable.items(items) ++ o`]",
123126
"startWith(Array[T], Scheduler)" -> "[use `Observable.items(items).subscribeOn(scheduler) ++ o`]",
@@ -151,6 +154,8 @@ class CompletenessTest extends JUnitSuite {
151154
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
152155
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
153156
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
157+
"zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)",
158+
"zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)",
154159

155160
// manually added entries for Java static methods
156161
"average(Observable[Integer])" -> averageProblem,

0 commit comments

Comments
 (0)