Skip to content

Commit 171131c

Browse files
add a int-version of buffer and window...
...and detect another compiler bug
1 parent 2901c06 commit 171131c

File tree

2 files changed

+65
-32
lines changed

2 files changed

+65
-32
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scalaimpl/Observable.scala

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package rx.lang.scalaimpl
1919

2020
import org.scalatest.junit.JUnitSuite
21+
import scala.collection.Seq
2122

2223

2324
/**
@@ -29,6 +30,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
2930
extends AnyVal
3031
{
3132
import scala.collection.JavaConverters._
33+
import scala.collection.Seq
3234
import scala.concurrent.duration.Duration
3335
import rx.{Observable => JObservable}
3436
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
@@ -218,25 +220,10 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
218220
* An {@link Observable} which produces connected non-overlapping buffers, which are emitted
219221
* when the current {@link Observable} created with the {@link Func0} argument produces a {@link rx.util.Closing} object.
220222
*/
221-
def buffer(bufferClosingSelector: () => Observable[Closing]) : Observable[java.util.List[_ <: T]] = {
223+
def buffer(bufferClosingSelector: () => Observable[Closing]) : Observable[Seq[T]] = {
222224
val f: rx.util.functions.Func0[_ <: rx.Observable[_ <: Closing]] = bufferClosingSelector().asJava
223-
val oJava: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(f)
224-
val oScala1: Observable[java.util.List[_]] = new Observable[java.util.List[_]](oJava)
225-
oScala1.asInstanceOf[Observable[java.util.List[_ <: T]]]
226-
}
227-
228-
// TODO decide whether to return Java list or Scala list
229-
230-
def bufferReturningScalaList(bufferClosingSelector: () => Observable[Closing]) : Observable[List[T]] = {
231-
val f: rx.util.functions.Func0[_ <: rx.Observable[_ <: Closing]] = bufferClosingSelector().asJava
232-
val oJava: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(f)
233-
val oScala1: Observable[java.util.List[_]] = new Observable[java.util.List[_]](oJava)
234-
val oScala2 = oScala1.map((lJava: java.util.List[_]) => {
235-
val bufferScala: scala.collection.mutable.Buffer[_] = lJava.asScala
236-
val listScala: List[Any] = bufferScala.toList
237-
listScala.asInstanceOf[List[T]]
238-
})
239-
oScala2
225+
val jObs: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(f)
226+
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
240227
}
241228

242229
/**
@@ -275,7 +262,10 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
275262
* An {@link Observable} which produces connected non-overlapping buffers containing at most
276263
* "count" produced values.
277264
*/
278-
// public Observable<List<T>> buffer(int count)
265+
def buffer(count: Int): Observable[Seq[T]] = {
266+
val oJava: rx.Observable[_ <: java.util.List[_]] = asJava.buffer(count)
267+
Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
268+
}
279269

280270
/**
281271
* Creates an Observable which produces buffers of collected values.
@@ -462,7 +452,17 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
462452
* An {@link Observable} which produces connected non-overlapping windows containing at most
463453
* "count" produced values.
464454
*/
465-
// public Observable<Observable<T>> window(int count)
455+
def window(count: Int): Observable[Observable[T]] = {
456+
// this line makes the compiler crash (if it's the last line of the method):
457+
// Observable.jObsOfJObsToScObsOfScObs(asJava.window(count))
458+
459+
// this line gives an error:
460+
// "type mismatch; found : rx.Observable[rx.Observable[_$1]] required: rx.Observable[rx.Observable[T]]"
461+
// Observable.jObsOfJObsToScObsOfScObs(asJava.window(count) : JObservable[JObservable[T]])
462+
463+
// workaround with a cast:
464+
Observable.jObsOfJObsToScObsOfScObs(asJava.window(count).asInstanceOf[JObservable[JObservable[T]]])
465+
}
466466

467467
/**
468468
* Creates an Observable which produces windows of collected values. This Observable produces windows every
@@ -941,7 +941,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
941941
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
942942
*/
943943
def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
944-
new Observable(asJava.reduce(initialValue, accumulator))
944+
new Observable[R](asJava.reduce(initialValue, accumulator))
945945
}
946946
// corresponds to Java's
947947
// public <R> Observable<R> reduce(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator)
@@ -1009,7 +1009,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
10091009
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
10101010
*/
10111011
def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
1012-
new Observable(asJava.scan(initialValue, accumulator))
1012+
new Observable[R](asJava.scan(initialValue, accumulator))
10131013
}
10141014
// corresponds to Scala's
10151015
// public <R> Observable<R> scan(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator)
@@ -1106,6 +1106,10 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
11061106
def takeWhileWithIndex(predicate: (T, Integer) => Boolean): Observable[T] = {
11071107
new Observable[T](asJava.takeWhileWithIndex(predicate))
11081108
}
1109+
1110+
def zipWithIndex: Observable[(T, Int)] = {
1111+
???
1112+
}
11091113

11101114
/**
11111115
* Returns an Observable that emits only the last <code>count</code> items emitted by the source
@@ -1276,13 +1280,26 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12761280

12771281
object Observable {
12781282
import scala.collection.JavaConverters._
1283+
import scala.collection.immutable.Range
12791284
import scala.concurrent.duration.Duration
12801285
import scala.concurrent.Future
12811286
import rx.{Observable => JObservable}
12821287
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
12831288
import rx.lang.scala.util._
12841289
import rx.lang.scalaimpl.ImplicitFunctionConversions._
1285-
1290+
1291+
private[scalaimpl]
1292+
def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = {
1293+
val oScala1: Observable[java.util.List[T]] = new Observable[java.util.List[T]](jObs)
1294+
oScala1.map((lJava: java.util.List[T]) => lJava.asScala)
1295+
}
1296+
1297+
private[scalaimpl]
1298+
def jObsOfJObsToScObsOfScObs[T](jObs: rx.Observable[_ <: rx.Observable[_ <: T]]): Observable[Observable[T]] = {
1299+
val oScala1: Observable[rx.Observable[_ <: T]] = new Observable[rx.Observable[_ <: T]](jObs)
1300+
oScala1.map((oJava: rx.Observable[_ <: T]) => new Observable[T](oJava))
1301+
}
1302+
12861303
/**
12871304
* Creates an Observable that will execute the given function when an {@link Observer} subscribes to it.
12881305
* <p>
@@ -1308,7 +1325,7 @@ object Observable {
13081325
* function
13091326
*/
13101327
def apply[T](func: Observer[T] => Subscription): Observable[T] = {
1311-
new Observable(JObservable.create(func))
1328+
new Observable[T](JObservable.create(func))
13121329
}
13131330
// corresponds to Java's
13141331
// public static <T> Observable<T> create(OnSubscribeFunc<T> func)
@@ -1329,7 +1346,7 @@ object Observable {
13291346
* @return an Observable that invokes the {@link Observer}'s {@link Observer#onError onError} method when the Observer subscribes to it
13301347
*/
13311348
def apply(exception: Throwable): Observable[Nothing] = {
1332-
new Observable(JObservable.error(exception))
1349+
new Observable[Nothing](JObservable.error(exception))
13331350
}
13341351
// corresponds to Java's
13351352
// public static <T> Observable<T> error(Throwable exception)
@@ -1354,11 +1371,15 @@ object Observable {
13541371
* @return an Observable that emits each item in the source Array
13551372
*/
13561373
def apply[T](args: T*): Observable[T] = {
1357-
new Observable(JObservable.from(args.toIterable.asJava))
1374+
new Observable[T](JObservable.from(args.toIterable.asJava))
13581375
}
13591376
// corresponds to Java's
13601377
// public static <T> Observable<T> from(T... items)
13611378

1379+
def apply(range: Range): Observable[Int] = {
1380+
new Observable[Int](JObservable.from(range.toIterable.asJava))
1381+
}
1382+
13621383
// There is no method corresponding to
13631384
// public static Observable<Integer> range(int start, int count)
13641385
// because the Scala collection library provides enough methods to create Iterables.
@@ -1385,7 +1406,7 @@ object Observable {
13851406
* factory function
13861407
*/
13871408
def defer[T](observable: => Observable[T]): Observable[T] = {
1388-
new Observable(JObservable.defer(observable.asJava))
1409+
new Observable[T](JObservable.defer(observable.asJava))
13891410
}
13901411
// corresponds to Java's
13911412
// public static <T> Observable<T> defer(Func0<? extends Observable<? extends T>> observableFactory)
@@ -1410,7 +1431,7 @@ object Observable {
14101431
* @return an Observable that emits a single item and then completes
14111432
*/
14121433
def just[T](value: T): Observable[T] = {
1413-
new Observable(JObservable.just(value))
1434+
new Observable[T](JObservable.just(value))
14141435
}
14151436
// corresponds to Java's
14161437
// public static <T> Observable<T> just(T value)
@@ -1548,7 +1569,7 @@ object Observable {
15481569
* @return an Observable that never sends any items or notifications to an {@link Observer}
15491570
*/
15501571
def never: Observable[Nothing] = {
1551-
new Observable(JObservable.never())
1572+
new Observable[Nothing](JObservable.never())
15521573
}
15531574

15541575
// There is no method corresponding to
@@ -1623,12 +1644,12 @@ object Observable {
16231644
class WithFilter[+T] private[scalaimpl] (p: T => Boolean, wrapped: rx.Observable[_ <: T]) {
16241645
import rx.lang.scalaimpl.ImplicitFunctionConversions._
16251646

1626-
def map[B](f: T => B): Observable[B] = new Observable(wrapped.filter(p).map(f))
1647+
def map[B](f: T => B): Observable[B] = new Observable[B](wrapped.filter(p).map(f))
16271648
def flatMap[B](f: T => Observable[B]): Observable[B] = {
16281649
??? // TODO
16291650
}
16301651
def foreach(f: T => Unit): Unit = wrapped.filter(p).toBlockingObservable.forEach(f)
1631-
def withFilter(p: T => Boolean): Observable[T] = new Observable(wrapped.filter(p))
1652+
def withFilter(p: T => Boolean): Observable[T] = new Observable[T](wrapped.filter(p))
16321653
}
16331654

16341655
class UnitTestSuite extends JUnitSuite {

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scalatests/RxScalaDemo.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.junit.{Before, Test, Ignore}
99

1010
import org.junit.Assert._
1111

12-
@Ignore // Since this doesn't do automatic testing.
12+
//@Ignore // Since this doesn't do automatic testing.
1313
class RxScalaDemo extends JUnitSuite {
1414

1515
def output(s: String): Unit = println(s)
@@ -92,4 +92,16 @@ class RxScalaDemo extends JUnitSuite {
9292
sleep(2500)
9393
}
9494

95+
@Test def rangeAndBufferExample() {
96+
val o = Observable(1 to 18)
97+
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
98+
}
99+
100+
@Test def windowExample() {
101+
// this will be nicer once we have zipWithIndex
102+
(for ((o, i) <- Observable(1 to 18).window(5) zip Observable(0 until 4); n <- o)
103+
yield s"Observable#$i emits $n")
104+
.subscribe(output(_))
105+
}
106+
95107
}

0 commit comments

Comments
 (0)