Skip to content

Commit d46af63

Browse files
committed
Add amb, delay and delaySubscription in rxjava-scala
1 parent a0ebbc7 commit d46af63

File tree

2 files changed

+103
-0
lines changed

2 files changed

+103
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,4 +533,35 @@ class RxScalaDemo extends JUnitSuite {
533533
println(result)
534534
}
535535

536+
@Test def ambExample(): Unit = {
537+
val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds)
538+
val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds)
539+
val result = o1.amb(o2).toBlockingObservable.toList
540+
println(result)
541+
}
542+
543+
@Test def delayExample(): Unit = {
544+
val o = List(100L, 200L, 300L).toObservable.delay(2 seconds)
545+
val result = o.toBlockingObservable.toList
546+
println(result)
547+
}
548+
549+
@Test def delayExample2(): Unit = {
550+
val o = List(100L, 200L, 300L).toObservable.delay(2 seconds, IOScheduler())
551+
val result = o.toBlockingObservable.toList
552+
println(result)
553+
}
554+
555+
@Test def delaySubscriptionExample(): Unit = {
556+
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds)
557+
val result = o.toBlockingObservable.toList
558+
println(result)
559+
}
560+
561+
@Test def delaySubscriptionExample2(): Unit = {
562+
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds, IOScheduler())
563+
val result = o.toBlockingObservable.toList
564+
println(result)
565+
}
566+
536567
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2236,6 +2236,78 @@ trait Observable[+T]
22362236
def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = {
22372237
toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted)))
22382238
}
2239+
2240+
/**
2241+
* Given two Observables, mirror the one that first emits an item.
2242+
*
2243+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/amb.png">
2244+
*
2245+
* You can combine items emitted by two Observables so that they act like a single
2246+
* Observable by using the `merge` method.
2247+
*
2248+
* @param that
2249+
* an Observable competing to react first
2250+
* @return an Observable that emits the same sequence of items as whichever of `this` or `that` first emitted an item.
2251+
*/
2252+
def amb[U >: T](that: Observable[U]): Observable[U] = {
2253+
val thisJava: rx.Observable[_ <: U] = this.asJavaObservable
2254+
val thatJava: rx.Observable[_ <: U] = that.asJavaObservable
2255+
toScalaObservable[U](rx.Observable.amb(thisJava, thatJava))
2256+
}
2257+
2258+
/**
2259+
* Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a
2260+
* specified delay. Error notifications from the source Observable are not delayed.
2261+
*
2262+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delay.png">
2263+
*
2264+
* @param delay the delay to shift the source by
2265+
* @return the source Observable shifted in time by the specified delay
2266+
*/
2267+
def delay(delay: Duration): Observable[T] = {
2268+
toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit))
2269+
}
2270+
2271+
/**
2272+
* Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a
2273+
* specified delay. Error notifications from the source Observable are not delayed.
2274+
*
2275+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delay.s.png">
2276+
*
2277+
* @param delay the delay to shift the source by
2278+
* @param scheduler the Scheduler to use for delaying
2279+
* @return the source Observable shifted in time by the specified delay
2280+
*/
2281+
def delay(delay: Duration, scheduler: Scheduler): Observable[T] = {
2282+
toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit, scheduler))
2283+
}
2284+
2285+
/**
2286+
* Return an Observable that delays the subscription to the source Observable by a given amount of time.
2287+
*
2288+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delaySubscription.png">
2289+
*
2290+
* @param delay the time to delay the subscription
2291+
* @return an Observable that delays the subscription to the source Observable by the given amount
2292+
*/
2293+
def delaySubscription(delay: Duration): Observable[T] = {
2294+
toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit))
2295+
}
2296+
2297+
/**
2298+
* Return an Observable that delays the subscription to the source Observable by a given amount of time,
2299+
* both waiting and subscribing on a given Scheduler.
2300+
*
2301+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delaySubscription.s.png">
2302+
*
2303+
* @param delay the time to delay the subscription
2304+
* @param scheduler the Scheduler on which the waiting and subscription will happen
2305+
* @return an Observable that delays the subscription to the source Observable by a given
2306+
* amount, waiting and subscribing on the given Scheduler
2307+
*/
2308+
def delaySubscription(delay: Duration, scheduler: Scheduler): Observable[T] = {
2309+
toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler))
2310+
}
22392311
}
22402312

22412313
/**

0 commit comments

Comments
 (0)