Skip to content

Commit b4019b9

Browse files
add mergeDelayError and flattenDelayError
1 parent 55a552a commit b4019b9

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,29 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12711271
Observable[U](rx.Observable.merge(thisJava, thatJava))
12721272
}
12731273

1274+
/**
1275+
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables
1276+
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
1277+
* refrain from propagating that error notification until all of the merged Observables have
1278+
* finished emitting items.
1279+
* <p>
1280+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeDelayError.png">
1281+
* <p>
1282+
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only invoke the {@code onError} method of its
1283+
* Observers once.
1284+
* <p>
1285+
* This method allows an Observer to receive all successfully emitted items from all of the
1286+
* source Observables without being interrupted by an error notification from one of them.
1287+
*
1288+
* @param that
1289+
* an Observable to be merged
1290+
* @return an Observable that emits items that are the result of flattening the items emitted by
1291+
* {$code this} and {$code that}
1292+
*/
1293+
def mergeDelayError[U >: T](that: Observable[U]): Observable[U] = {
1294+
Observable[U](rx.Observable.mergeDelayError[U](this.asJava, that.asJava))
1295+
}
1296+
12741297
/**
12751298
* Flattens the sequence of Observables emitted by {@code this} into one Observable, without any
12761299
* transformation.
@@ -1291,6 +1314,31 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12911314
Observable[U](o5)
12921315
}
12931316

1317+
/**
1318+
* This behaves like {@link #flatten(<:<)} except that if any of the merged Observables
1319+
* notify of an error via {@link Observer#onError onError}, this method will
1320+
* refrain from propagating that error notification until all of the merged Observables have
1321+
* finished emitting items.
1322+
* <p>
1323+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeDelayError.png">
1324+
* <p>
1325+
* Even if multiple merged Observables send {@code onError} notifications, this method will only invoke the {@code onError} method of its
1326+
* Observers once.
1327+
* <p>
1328+
* This method allows an Observer to receive all successfully emitted items from all of the
1329+
* source Observables without being interrupted by an error notification from one of them.
1330+
*
1331+
* @return an Observable that emits items that are the result of flattening the items emitted by
1332+
* the Observables emitted by the this Observable
1333+
*/
1334+
def flattenDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
1335+
val o2: Observable[Observable[U]] = this
1336+
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJava)
1337+
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJava
1338+
val o5 = rx.Observable.mergeDelayError[U](o4)
1339+
Observable[U](o5)
1340+
}
1341+
12941342
/**
12951343
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
12961344
* <p>

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ class CompletenessTest extends JUnitSuite {
7474
"from(Iterable[_ <: T])" -> "apply(T*)",
7575
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
7676
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
77-
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[T])",
77+
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
78+
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
7879
"range(Int, Int)" -> "apply(Range)",
7980
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use (first zip second) map (p => p._1 == p._2)]",
8081
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use (first zip second) map (p => equality(p._1, p._2))]",
@@ -105,6 +106,9 @@ class CompletenessTest extends JUnitSuite {
105106
}).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
106107
// merge 3-9:
107108
"merge(" + _ + ")" -> "[unnecessary because we can use Observable(o1, o2, ...).flatten instead]"
109+
).drop(2).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
110+
// mergeDelayError 3-9:
111+
"mergeDelayError(" + _ + ")" -> "[unnecessary because we can use Observable(o1, o2, ...).flattenDelayError instead]"
108112
).drop(2).toMap
109113

110114
def removePackage(s: String) = s.replaceAll("(\\w+\\.)+(\\w+)", "$2")

0 commit comments

Comments
 (0)