Skip to content

Commit 1495968

Browse files
add concat, length, retry; rename replay()->replay, merge->flatten
1 parent 5a813d6 commit 1495968

File tree

4 files changed

+130
-54
lines changed

4 files changed

+130
-54
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,23 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
157157
Observable(JObservable.concat(o1, o2))
158158
}
159159

160-
160+
/**
161+
* Returns an Observable that emits the items emitted by two or more Observables, one after the
162+
* other.
163+
* <p>
164+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/concat.png">
165+
*
166+
* @return an Observable that emits items that are the result of combining the items emitted by
167+
* the source Observables, one after the other
168+
*/
169+
def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
170+
val o2: Observable[Observable[U]] = this
171+
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJava)
172+
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJava
173+
val o5 = rx.Observable.concat[U](o4)
174+
Observable[U](o5)
175+
}
176+
161177
/**
162178
* Wraps this Observable in another Observable that ensures that the resulting
163179
* Observable is chronologically well-behaved.
@@ -905,7 +921,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
905921
* @return a pair of a start function and an {@link Observable} such that when the start function
906922
* is called, the Observable starts to emit items to its {@link Observer}s
907923
*/
908-
def replay(): (() => Subscription, Observable[T]) = {
924+
def replay: (() => Subscription, Observable[T]) = {
909925
val javaCO = asJava.replay()
910926
(() => javaCO.connect(), Observable[T](javaCO))
911927
}
@@ -1234,8 +1250,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12341250
val o5 = rx.Observable.switchOnNext[U](o4)
12351251
Observable[U](o5)
12361252
}
1237-
// TODO naming: follow C# (switch) or Java (switchOnNext)?
1238-
// public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? extends T>> sequenceOfSequences)
1253+
// Naming: We follow C# (switch), not Java (switchOnNext), because Java just had to avoid clash with keyword
12391254

12401255
/**
12411256
* Flattens two Observables into one Observable, without any transformation.
@@ -1263,12 +1278,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12631278
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
12641279
* <p>
12651280
* You can combine the items emitted by multiple Observables so that they act like a single
1266-
* Observable, by using the {@code merge} method.
1281+
* Observable by using this method.
12671282
*
12681283
* @return an Observable that emits items that are the result of flattening the items emitted
12691284
* by the Observables emitted by {@code this}
12701285
*/
1271-
def merge[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
1286+
def flatten[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
12721287
val o2: Observable[Observable[U]] = this
12731288
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJava)
12741289
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJava
@@ -1601,6 +1616,55 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
16011616
// TODO once https://github.com/Netflix/RxJava/issues/395 is fixed
16021617
//}
16031618

1619+
/**
1620+
* Returns an Observable that counts the total number of elements in the source Observable.
1621+
* <p>
1622+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/count.png">
1623+
*
1624+
* @return an Observable emitting the number of counted elements of the source Observable
1625+
* as its single item.
1626+
*/
1627+
def length: Observable[Int] = {
1628+
Observable[Integer](asJava.count()).map(_.intValue())
1629+
}
1630+
1631+
/**
1632+
* Retry subscription to origin Observable upto given retry count.
1633+
* <p>
1634+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retry.png">
1635+
* <p>
1636+
* If {@link Observer#onError} is invoked the source Observable will be re-subscribed to as many times as defined by retryCount.
1637+
* <p>
1638+
* Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together.
1639+
* <p>
1640+
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
1641+
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
1642+
*
1643+
* @param retryCount
1644+
* Number of retry attempts before failing.
1645+
* @return Observable with retry logic.
1646+
*/
1647+
def retry(retryCount: Int): Observable[T] = {
1648+
Observable[T](asJava.retry(retryCount))
1649+
}
1650+
1651+
/**
1652+
* Retry subscription to origin Observable whenever onError is called (infinite retry count).
1653+
* <p>
1654+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retry.png">
1655+
* <p>
1656+
* If {@link Observer#onError} is invoked the source Observable will be re-subscribed to.
1657+
* <p>
1658+
* Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together.
1659+
* <p>
1660+
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
1661+
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
1662+
* @return Observable with retry logic.
1663+
*/
1664+
def retry: Observable[T] = {
1665+
Observable[T](asJava.retry())
1666+
}
1667+
16041668
/**
16051669
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking
16061670
* operators).
@@ -1756,9 +1820,6 @@ object Observable {
17561820
def just[T](value: T): Observable[T] = {
17571821
Observable[T](JObservable.just(value))
17581822
}
1759-
1760-
// TODO we have merge and concat (++) as binary instance methods, but do we also need them as
1761-
// static methods with arity > 2?
17621823

17631824
/**
17641825
* This behaves like {@link #merge(java.util.List)} except that if any of the merged Observables

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,41 +7,51 @@ object Olympics {
77
case class Medal(val year: Int, val games: String, val discipline: String, val medal: String, val athlete: String, val country: String)
88

99
def mountainBikeMedals: Observable[Medal] = Observable(
10-
Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"),
11-
Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"),
12-
Medal(1996, "Atlanta 1996", "cross-country men", "Silver", "Thomas FRISCHKNECHT", "Switzerland"),
13-
Medal(1996, "Atlanta 1996", "cross-country women", "Silver", "Alison SYDOR", "Canada"),
14-
Medal(1996, "Atlanta 1996", "cross-country men", "Bronze", "Miguel MARTINEZ", "France"),
15-
Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America")
16-
) ++ fourYearsEmpty ++ Observable(
17-
Medal(2000, "Sydney 2000", "cross-country women", "Gold", "Paola PEZZO", "Italy"),
18-
Medal(2000, "Sydney 2000", "cross-country women", "Silver", "Barbara BLATTER", "Switzerland"),
19-
Medal(2000, "Sydney 2000", "cross-country women", "Bronze", "Marga FULLANA", "Spain"),
20-
Medal(2000, "Sydney 2000", "cross-country men", "Gold", "Miguel MARTINEZ", "France"),
21-
Medal(2000, "Sydney 2000", "cross-country men", "Silver", "Filip MEIRHAEGHE", "Belgium"),
22-
Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland")
23-
) ++ fourYearsEmpty ++ Observable(
24-
Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"),
25-
Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"),
26-
Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"),
27-
Medal(2004, "Athens 2004", "cross-country women", "Gold", "Gunn-Rita DAHLE", "Norway"),
28-
Medal(2004, "Athens 2004", "cross-country women", "Silver", "Marie-Helene PREMONT", "Canada"),
29-
Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany")
30-
) ++ fourYearsEmpty ++ Observable(
31-
Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"),
32-
Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"),
33-
Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"),
34-
Medal(2008, "Beijing 2008", "cross-country men", "Gold", "Julien ABSALON", "France"),
35-
Medal(2008, "Beijing 2008", "cross-country men", "Silver", "Jean-Christophe PERAUD", "France"),
36-
Medal(2008, "Beijing 2008", "cross-country men", "Bronze", "Nino SCHURTER", "Switzerland")
37-
) ++ fourYearsEmpty ++ Observable(
38-
Medal(2012, "London 2012", "cross-country men", "Gold", "Jaroslav KULHAVY", "Czech Republic"),
39-
Medal(2012, "London 2012", "cross-country men", "Silver", "Nino SCHURTER", "Switzerland"),
40-
Medal(2012, "London 2012", "cross-country men", "Bronze", "Marco Aurelio FONTANA", "Italy"),
41-
Medal(2012, "London 2012", "cross-country women", "Gold", "Julie BRESSET", "France"),
42-
Medal(2012, "London 2012", "cross-country women", "Silver", "Sabine SPITZ", "Germany"),
43-
Medal(2012, "London 2012", "cross-country women", "Bronze", "Georgia GOULD", "United States of America")
44-
)
10+
Observable(
11+
Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"),
12+
Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"),
13+
Medal(1996, "Atlanta 1996", "cross-country men", "Silver", "Thomas FRISCHKNECHT", "Switzerland"),
14+
Medal(1996, "Atlanta 1996", "cross-country women", "Silver", "Alison SYDOR", "Canada"),
15+
Medal(1996, "Atlanta 1996", "cross-country men", "Bronze", "Miguel MARTINEZ", "France"),
16+
Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America")
17+
),
18+
fourYearsEmpty,
19+
Observable(
20+
Medal(2000, "Sydney 2000", "cross-country women", "Gold", "Paola PEZZO", "Italy"),
21+
Medal(2000, "Sydney 2000", "cross-country women", "Silver", "Barbara BLATTER", "Switzerland"),
22+
Medal(2000, "Sydney 2000", "cross-country women", "Bronze", "Marga FULLANA", "Spain"),
23+
Medal(2000, "Sydney 2000", "cross-country men", "Gold", "Miguel MARTINEZ", "France"),
24+
Medal(2000, "Sydney 2000", "cross-country men", "Silver", "Filip MEIRHAEGHE", "Belgium"),
25+
Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland")
26+
),
27+
fourYearsEmpty,
28+
Observable(
29+
Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"),
30+
Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"),
31+
Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"),
32+
Medal(2004, "Athens 2004", "cross-country women", "Gold", "Gunn-Rita DAHLE", "Norway"),
33+
Medal(2004, "Athens 2004", "cross-country women", "Silver", "Marie-Helene PREMONT", "Canada"),
34+
Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany")
35+
),
36+
fourYearsEmpty,
37+
Observable(
38+
Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"),
39+
Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"),
40+
Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"),
41+
Medal(2008, "Beijing 2008", "cross-country men", "Gold", "Julien ABSALON", "France"),
42+
Medal(2008, "Beijing 2008", "cross-country men", "Silver", "Jean-Christophe PERAUD", "France"),
43+
Medal(2008, "Beijing 2008", "cross-country men", "Bronze", "Nino SCHURTER", "Switzerland")
44+
),
45+
fourYearsEmpty,
46+
Observable(
47+
Medal(2012, "London 2012", "cross-country men", "Gold", "Jaroslav KULHAVY", "Czech Republic"),
48+
Medal(2012, "London 2012", "cross-country men", "Silver", "Nino SCHURTER", "Switzerland"),
49+
Medal(2012, "London 2012", "cross-country men", "Bronze", "Marco Aurelio FONTANA", "Italy"),
50+
Medal(2012, "London 2012", "cross-country women", "Gold", "Julie BRESSET", "France"),
51+
Medal(2012, "London 2012", "cross-country women", "Silver", "Sabine SPITZ", "Germany"),
52+
Medal(2012, "London 2012", "cross-country women", "Bronze", "Georgia GOULD", "United States of America")
53+
)
54+
).concat
4555

4656
// speed it up :D
4757
val fourYears = 4000.millis

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,19 @@ class RxScalaDemo extends JUnitSuite {
107107
Observable.interval(period.millis).map(n => s"Obs-$period emits $n")
108108
}
109109

110-
@Test def mergeManyExample() {
110+
@Test def flattenManyExample() {
111111
val o = Observable.interval(500 millis).map(n => myInterval((n+1)*100))
112112
val stopper = Observable.interval(5 seconds)
113-
o.merge.takeUntil(stopper).toBlockingObservable.foreach(println(_))
113+
o.flatten.takeUntil(stopper).toBlockingObservable.foreach(println(_))
114114
}
115115

116-
@Test def mergeSomeExample() {
116+
@Test def fattenSomeExample() {
117117
// To merge some observables which are all known already:
118118
Observable(
119119
Observable.interval(200 millis),
120120
Observable.interval(400 millis),
121121
Observable.interval(800 millis)
122-
).merge.take(12).toBlockingObservable.foreach(println(_))
122+
).flatten.take(12).toBlockingObservable.foreach(println(_))
123123
}
124124

125125
@Test def rangeAndBufferExample() {
@@ -205,7 +205,7 @@ class RxScalaDemo extends JUnitSuite {
205205
println("Observable for modulo" + modulo + " started")
206206

207207
if (firstOnly) numbers.take(1) else numbers
208-
}).merge.toBlockingObservable.foreach(println(_))
208+
}).flatten.toBlockingObservable.foreach(println(_))
209209
}
210210

211211
@Test def timingTest1() {
@@ -217,7 +217,7 @@ class RxScalaDemo extends JUnitSuite {
217217
println("Observable for modulo" + modulo + " started at t = " + (System.currentTimeMillis - t0))
218218
numbers.take(1) // <- TODO very unexpected
219219
//numbers
220-
}).merge.toBlockingObservable.foreach(println(_))
220+
}).flatten.toBlockingObservable.foreach(println(_))
221221
}
222222

223223
@Test def groupByExample() {

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ class CompletenessTest extends JUnitSuite {
1313

1414
val unnecessary = "[considered unnecessary in Scala land]"
1515

16+
val deprecated = "[deprecated in RxJava]"
17+
1618
val averageProblem = "[We can't have a general average method because Scala's Numeric does not have " +
1719
"scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum). " +
1820
"You can use fold instead to accumulate sum and numberOfElements and divide at the end.]"
@@ -26,6 +28,7 @@ class CompletenessTest extends JUnitSuite {
2628
"all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)",
2729
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
2830
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
31+
"count()" -> "length",
2932
"dematerialize()" -> "dematerialize(<:<[T, Notification[U]])",
3033
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
3134
"firstOrDefault(T)" -> "firstOrElse(=> U)",
@@ -61,13 +64,14 @@ class CompletenessTest extends JUnitSuite {
6164
"averageFloats(Observable[Float])" -> averageProblem,
6265
"averageLongs(Observable[Long])" -> averageProblem,
6366
"create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)",
67+
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
6468
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
6569
"empty()" -> "apply(T*)",
6670
"error(Throwable)" -> "apply(Throwable)",
6771
"from(Array[T])" -> "apply(T*)",
6872
"from(Iterable[_ <: T])" -> "apply(T*)",
6973
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
70-
"merge(Observable[_ <: Observable[_ <: T]])" -> "merge(<:<[Observable[T], Observable[Observable[U]]])",
74+
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
7175
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[T])",
7276
"range(Int, Int)" -> "apply(Range)",
7377
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use (first zip second) map (p => p._1 == p._2)]",
@@ -77,7 +81,8 @@ class CompletenessTest extends JUnitSuite {
7781
"sumFloats(Observable[Float])" -> "sum(Numeric[U])",
7882
"sumLongs(Observable[Long])" -> "sum(Numeric[U])",
7983
"synchronize(Observable[T])" -> "synchronize",
80-
"switchDo(Observable[_ <: Observable[_ <: T]])" -> "switch",
84+
"switchDo(Observable[_ <: Observable[_ <: T]])" -> deprecated,
85+
"switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])",
8186
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method zip and map]",
8287
"zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use zip in companion object and map]",
8388
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use zip in companion object and map]"
@@ -86,7 +91,7 @@ class CompletenessTest extends JUnitSuite {
8691
"startWith(" + _ + ")" -> "[unnecessary because we can just use ++ instead]"
8792
).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
8893
// concat 2-9
89-
"concat(" + _ + ")" -> "[unnecessary because we can use ++ instead]"
94+
"concat(" + _ + ")" -> "[unnecessary because we can use ++ instead or Observable(o1, o2, ...).concat]"
9095
).drop(1).toMap ++ List.iterate("T", 10)(s => s + ", T").map(
9196
// all 10 overloads of from:
9297
"from(" + _ + ")" -> "apply(T*)"
@@ -97,7 +102,7 @@ class CompletenessTest extends JUnitSuite {
97102
("zip(" + obsArgs + "Func" + i + "[" + funcParams + "_ <: R])", unnecessary)
98103
}).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
99104
// merge 3-9:
100-
"merge(" + _ + ")" -> "[unnecessary because we can use Observable(o1, o2, ...).merge instead]"
105+
"merge(" + _ + ")" -> "[unnecessary because we can use Observable(o1, o2, ...).flatten instead]"
101106
).drop(2).toMap
102107

103108
def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2")

0 commit comments

Comments
 (0)