Skip to content

Commit 784f676

Browse files
committed
Update docs for "apply" and add an example
1 parent 566e892 commit 784f676

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,34 @@ class RxScalaDemo extends JUnitSuite {
620620
o.take(1).subscribe(println(_))
621621
}
622622

623+
@Test def createExampleGood2() {
624+
import scala.io.{Codec, Source}
625+
626+
val rxscala = Observable[String](subscriber => {
627+
try {
628+
val input = new java.net.URL("http://rxscala.github.io/").openStream()
629+
subscriber.add(Subscription {
630+
input.close()
631+
})
632+
Source.fromInputStream(input)(Codec.UTF8).getLines()
633+
.takeWhile(_ => !subscriber.isUnsubscribed)
634+
.foreach(subscriber.onNext(_))
635+
if (!subscriber.isUnsubscribed) {
636+
subscriber.onCompleted()
637+
}
638+
}
639+
catch {
640+
case e: Throwable => if (!subscriber.isUnsubscribed) subscriber.onError(e)
641+
}
642+
}).subscribeOn(IOScheduler())
643+
644+
val count = rxscala.flatMap(_.split("\\W+").toSeq.toObservable)
645+
.map(_.toLowerCase)
646+
.filter(_ == "rxscala")
647+
.size
648+
println(s"RxScala appears ${count.toBlockingObservable.single} times in http://rxscala.github.io/")
649+
}
650+
623651
def output(s: String): Unit = println(s)
624652

625653
/** Subscribes to obs and waits until obs has completed. Note that if you subscribe to

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3330,10 +3330,19 @@ object Observable {
33303330
* Write the function you pass so that it behaves as an Observable: It should invoke the
33313331
* Subscriber's `onNext`, `onError`, and `onCompleted` methods appropriately.
33323332
*
3333+
* You can `add` custom [[Subscription]]s to [[Subscriber]]. These [[Subscription]]s will be called
3334+
* <ul>
3335+
* <li>when someone calls `unsubscribe`.</li>
3336+
* <li>after `onCompleted` or `onError`.</li>
3337+
* </ul>
3338+
*
33333339
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
33343340
* information.
33353341
*
3336-
* @tparam T
3342+
* See `<a href="https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala">RxScalaDemo</a>.createExampleGood`
3343+
* and `<a href="https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala">RxScalaDemo</a>.createExampleGood2`.
3344+
*
3345+
* @param T
33373346
* the type of the items that this Observable emits
33383347
* @param f
33393348
* a function that accepts a `Subscriber[T]`, and invokes its `onNext`,

0 commit comments

Comments
 (0)