Skip to content

Commit f4640ec

Browse files
committed
Add buffer variants to RxScala
1 parent 354c959 commit f4640ec

File tree

2 files changed

+43
-0
lines changed

2 files changed

+43
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,12 @@ class RxScalaDemo extends JUnitSuite {
142142
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
143143
}
144144

145+
@Test def bufferExample() {
146+
val o = Observable.from(1 to 18).zip(Observable.interval(100 millis)).map(_._1)
147+
val boundary = Observable.interval(500 millis)
148+
o.buffer(boundary).toBlockingObservable.foreach((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
149+
}
150+
145151
@Test def windowExample() {
146152
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
147153
yield s"Observable#$i emits $n"

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,43 @@ trait Observable[+T]
565565
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
566566
}
567567

568+
/**
569+
* Returns an Observable that emits non-overlapping buffered items from the source Observable each time the
570+
* specified boundary Observable emits an item.
571+
* <p>
572+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer8.png">
573+
* <p>
574+
* Completion of either the source or the boundary Observable causes the returned Observable to emit the
575+
* latest buffer and complete.
576+
*
577+
* @param boundary the boundary Observable
578+
* @return an Observable that emits buffered items from the source Observable when the boundary Observable
579+
* emits an item
580+
*/
581+
def buffer(boundary: Observable[Any]): Observable[Seq[T]] = {
582+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
583+
toScalaObservable(thisJava.buffer(boundary.asJavaObservable)).map(_.asScala)
584+
}
585+
586+
/**
587+
* Returns an Observable that emits non-overlapping buffered items from the source Observable each time the
588+
* specified boundary Observable emits an item.
589+
* <p>
590+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer8.png">
591+
* <p>
592+
* Completion of either the source or the boundary Observable causes the returned Observable to emit the
593+
* latest buffer and complete.
594+
*
595+
* @param boundary the boundary Observable
596+
* @param initialCapacity the initial capacity of each buffer chunk
597+
* @return an Observable that emits buffered items from the source Observable when the boundary Observable
598+
* emits an item
599+
*/
600+
def buffer(boundary: Observable[Any], initialCapacity: Int): Observable[Seq[T]] = {
601+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
602+
toScalaObservable(thisJava.buffer(boundary.asJavaObservable, initialCapacity)).map(_.asScala)
603+
}
604+
568605
/**
569606
* Creates an Observable which produces windows of collected values. This Observable produces connected
570607
* non-overlapping windows. The current window is emitted and replaced with a new window when the

0 commit comments

Comments
 (0)