Skip to content

Commit e359f3c

Browse files
Fixed buglet in join binding.
Simplified types for groupByUntil, window, buffer, and empty to use Any and Nothing.
1 parent a995623 commit e359f3c

File tree

4 files changed

+47
-47
lines changed

4 files changed

+47
-47
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ object Olympics {
8080
// So we don't use this:
8181
// Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
8282
// But we just return empty, which completes immediately
83-
Observable.empty[Medal]
83+
Observable.empty
8484
}
8585

8686
}

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,10 @@ class RxScalaDemo extends JUnitSuite {
243243
}
244244

245245
@Test def groupByUntilExample() {
246-
val numbers = Observable.interval(250 millis) take 14
247-
val grouped = numbers.groupByUntil[Long, Long](
248-
{case x => x % 2},
249-
{case (key, obs) => obs filter {case x => x == 7}}
250-
)
251-
val sequenced = (grouped map {case (key, obs) => obs.toSeq}).flatten
252-
sequenced subscribe {x => println(s"Emitted group: $x")}
246+
val numbers = Observable.interval(250 millis).take(14)
247+
val grouped = numbers.groupByUntil[Long](x => x % 2, {case (key, obs) => obs.filter(x => x == 7)})
248+
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
249+
sequenced.subscribe(x => println(s"Emitted group: $x"))
253250
}
254251

255252

@@ -312,7 +309,7 @@ class RxScalaDemo extends JUnitSuite {
312309
}
313310

314311
@Test def averageExample() {
315-
println(doubleAverage(Observable.empty[Double]).toBlockingObservable.single)
312+
println(doubleAverage(Observable.empty).toBlockingObservable.single)
316313
println(doubleAverage(List(0.0).toObservable).toBlockingObservable.single)
317314
println(doubleAverage(List(4.44).toObservable).toBlockingObservable.single)
318315
println(doubleAverage(List(1, 2, 3.5).toObservable).toBlockingObservable.single)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -352,10 +352,10 @@ trait Observable[+T]
352352
* @return
353353
* An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
354354
*/
355-
def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
355+
def buffer[Opening](openings: Observable[Opening], closings: Opening => Observable[Any]): Observable[Seq[T]] = {
356356
val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable
357-
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
358-
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing)
357+
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Any]] = (o: Opening) => closings(o).asJavaObservable
358+
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Any](opening, closing)
359359
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
360360
}
361361

@@ -539,9 +539,9 @@ trait Observable[+T]
539539
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted
540540
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
541541
*/
542-
def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = {
543-
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
544-
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func)
542+
def window(closings: () => Observable[Any]): Observable[Observable[T]] = {
543+
val func : Func0[_ <: rx.Observable[_ <: Any]] = closings().asJavaObservable
544+
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Any](func)
545545
val o2 = Observable.items(o1).map((x: rx.Observable[_]) => {
546546
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
547547
toScalaObservable[T](x2)
@@ -565,9 +565,9 @@ trait Observable[+T]
565565
* @return
566566
* An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
567567
*/
568-
def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
568+
def window[Opening](openings: Observable[Opening], closings: Opening => Observable[Any]) = {
569569
Observable.jObsOfJObsToScObsOfScObs(
570-
asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
570+
asJavaObservable.window[Opening, Any](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
571571
: Observable[Observable[T]] // SI-7818
572572
}
573573

@@ -1335,15 +1335,13 @@ trait Observable[+T]
13351335
* an observable that emits a single Closing when the group should be closed.
13361336
* @tparam K
13371337
* the type of the keys returned by the discriminator function.
1338-
* @tparam Closing
1339-
* the type of the element emitted from the closings observable.
13401338
* @return an Observable that emits `(key, observable)` pairs, where `observable`
13411339
* contains all items for which `f` returned `key` before `closings` emits a value.
13421340
*/
1343-
def groupByUntil[K, Closing](f: T => K, closings: (K, Observable[T])=>Observable[Closing]): Observable[(K, Observable[T])] = {
1344-
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Closing]] =
1341+
def groupByUntil[K](f: T => K, closings: (K, Observable[T])=>Observable[Any]): Observable[(K, Observable[T])] = {
1342+
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Any]] =
13451343
(jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
1346-
val o1 = asJavaObservable.groupByUntil[K, Closing](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
1344+
val o1 = asJavaObservable.groupByUntil[K, Any](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
13471345
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
13481346
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
13491347
}
@@ -1353,7 +1351,7 @@ trait Observable[+T]
13531351
* <p>
13541352
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/join_.png">
13551353
*
1356-
* @param inner
1354+
* @param other
13571355
* the second Observable to join items from
13581356
* @param leftDurationSelector
13591357
* a function to select a duration for each item emitted by the source Observable,
@@ -1370,24 +1368,26 @@ trait Observable[+T]
13701368
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#join">RxJava Wiki: join()</a>
13711369
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229750.aspx">MSDN: Observable.Join</a>
13721370
*/
1373-
def join[S, LeftClosing, RightClosing, R] (
1374-
inner: Observable[S],
1375-
leftDurationSelector: T=>Observable[LeftClosing],
1376-
rightDurationSelector: S=>Observable[RightClosing],
1377-
resultSelector: (T,S) => R
1378-
): Observable[R] = {
1371+
def join[S, R] (
1372+
other: Observable[S],
1373+
leftDurationSelector: T => Observable[Any],
1374+
rightDurationSelector: S => Observable[Any],
1375+
resultSelector: (T,S) => R
1376+
): Observable[R] = {
13791377

13801378
val outer : rx.Observable[_ <: T] = this.asJavaObservable
1381-
val left: Func1[_ >: T, _<: rx.Observable[_ <: LeftClosing]] = (t: T) => leftDurationSelector(t).asJavaObservable
1382-
val right: Func1[_ >: S, _<: rx.Observable[_ <: RightClosing]] = (s: S) => rightDurationSelector(s).asJavaObservable
1383-
1384-
val o1 = outer.asInstanceOf[rx.Observable[T]].join[S, LeftClosing, RightClosing, R](
1385-
inner.asJavaObservable.asInstanceOf[rx.Observable[S]],
1386-
left. asInstanceOf[Func1[T, rx.Observable[LeftClosing]]],
1387-
right.asInstanceOf[Func1[S, rx.Observable[RightClosing]]],
1388-
resultSelector.asInstanceOf[Func2[T,S,R]])
1389-
1390-
toScalaObservable[R](o1)
1379+
val inner : rx.Observable[_ <: S] = other.asJavaObservable
1380+
val left: Func1[_ >: T, _<: rx.Observable[_ <: Any]] = (t: T) => leftDurationSelector(t).asJavaObservable
1381+
val right: Func1[_ >: S, _<: rx.Observable[_ <: Any]] = (s: S) => rightDurationSelector(s).asJavaObservable
1382+
val f: Func2[_>: T, _ >: S, _ <: R] = resultSelector
1383+
1384+
toScalaObservable[R](
1385+
outer.asInstanceOf[rx.Observable[T]].join[S, Any, Any, R](
1386+
inner.asInstanceOf[rx.Observable[S]],
1387+
left. asInstanceOf[Func1[T, rx.Observable[Any]]],
1388+
right.asInstanceOf[Func1[S, rx.Observable[Any]]],
1389+
f.asInstanceOf[Func2[T,S,R]])
1390+
)
13911391
}
13921392

13931393
/**
@@ -2177,18 +2177,15 @@ object Observable {
21772177
* <p>
21782178
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/empty.s.png">
21792179
*
2180-
* @param scheduler the scheduler to call the
2181-
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
2182-
* @param T the type of the items (ostensibly) emitted by the Observable
21832180
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
21842181
* immediately invokes the [[rx.lang.scala.Observer]]r's
21852182
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
21862183
* specified scheduler
21872184
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
21882185
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
21892186
*/
2190-
def empty[T]: Observable[T] = {
2191-
toScalaObservable(rx.Observable.empty[T]())
2187+
def empty: Observable[Nothing] = {
2188+
toScalaObservable(rx.Observable.empty[Nothing]())
21922189
}
21932190

21942191
/**
@@ -2200,16 +2197,15 @@ object Observable {
22002197
*
22012198
* @param scheduler the scheduler to call the
22022199
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
2203-
* @param T the type of the items (ostensibly) emitted by the Observable
22042200
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
22052201
* immediately invokes the [[rx.lang.scala.Observer]]r's
22062202
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
22072203
* specified scheduler
22082204
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
22092205
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
22102206
*/
2211-
def empty[T](scheduler: Scheduler): Observable[T] = {
2212-
toScalaObservable(rx.Observable.empty[T](scalaSchedulerToJavaScheduler(scheduler)))
2207+
def empty(scheduler: Scheduler): Observable[Nothing] = {
2208+
toScalaObservable(rx.Observable.empty[Nothing](scalaSchedulerToJavaScheduler(scheduler)))
22132209
}
22142210

22152211
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ class ObservableTests extends JUnitSuite {
123123
assertEquals(6, o.toBlockingObservable.single)
124124
}
125125

126+
@Test def testJoin() {
127+
val xs = Observable.items(1,2,3)
128+
val ys = Observable.items("a")
129+
val zs = xs.join[String,String](ys, x => Observable.never, y => Observable.never, (x,y) => y+x)
130+
assertEquals(List("a1", "a2", "a3"),zs.toBlockingObservable.toList)
131+
}
132+
126133
/*
127134
@Test def testHead() {
128135
val observer = mock(classOf[Observer[Int]])

0 commit comments

Comments
 (0)