Skip to content

Commit ec2f6d6

Browse files
committed
Add delay variants to RxScala
1 parent 9e6e890 commit ec2f6d6

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,21 @@ class RxScalaDemo extends JUnitSuite {
642642
println(result)
643643
}
644644

645+
@Test def delayExample3(): Unit = {
646+
val o = List(100, 500, 200).toObservable.delay(
647+
(i: Int) => Observable.items(i).delay(i millis)
648+
)
649+
o.toBlockingObservable.foreach(println(_))
650+
}
651+
652+
@Test def delayExample4(): Unit = {
653+
val o = List(100, 500, 200).toObservable.delay(
654+
() => Observable.interval(500 millis).take(1),
655+
(i: Int) => Observable.items(i).delay(i millis)
656+
)
657+
o.toBlockingObservable.foreach(println(_))
658+
}
659+
645660
@Test def delaySubscriptionExample(): Unit = {
646661
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds)
647662
val result = o.toBlockingObservable.toList

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2690,6 +2690,57 @@ trait Observable[+T]
26902690
toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit, scheduler))
26912691
}
26922692

2693+
/**
2694+
* Returns an Observable that delays the emissions of the source Observable via another Observable on a
2695+
* per-item basis.
2696+
* <p>
2697+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delay.o.png">
2698+
* <p>
2699+
* Note: the resulting Observable will immediately propagate any `onError` notification
2700+
* from the source Observable.
2701+
*
2702+
* @param itemDelay a function that returns an Observable for each item emitted by the source Observable, which is
2703+
* then used to delay the emission of that item by the resulting Observable until the Observable
2704+
* returned from `itemDelay` emits an item
2705+
* @return an Observable that delays the emissions of the source Observable via another Observable on a per-item basis
2706+
*/
2707+
def delay(itemDelay: T => Observable[Any]): Observable[T] = {
2708+
val itemDelayJava = new Func1[T, rx.Observable[Any]] {
2709+
override def call(t: T): rx.Observable[Any] =
2710+
itemDelay(t).asJavaObservable.asInstanceOf[rx.Observable[Any]]
2711+
}
2712+
toScalaObservable[T](asJavaObservable.delay[Any](itemDelayJava))
2713+
}
2714+
2715+
/**
2716+
* Returns an Observable that delays the subscription to and emissions from the souce Observable via another
2717+
* Observable on a per-item basis.
2718+
* <p>
2719+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delay.oo.png">
2720+
* <p>
2721+
* Note: the resulting Observable will immediately propagate any `onError` notification
2722+
* from the source Observable.
2723+
*
2724+
* @param subscriptionDelay a function that returns an Observable that triggers the subscription to the source Observable
2725+
* once it emits any item
2726+
* @param itemDelay a function that returns an Observable for each item emitted by the source Observable, which is
2727+
* then used to delay the emission of that item by the resulting Observable until the Observable
2728+
* returned from `itemDelay` emits an item
2729+
* @return an Observable that delays the subscription and emissions of the source Observable via another
2730+
* Observable on a per-item basis
2731+
*/
2732+
def delay(subscriptionDelay: () => Observable[Any], itemDelay: T => Observable[Any]): Observable[T] = {
2733+
val subscriptionDelayJava = new Func0[rx.Observable[Any]] {
2734+
override def call(): rx.Observable[Any] =
2735+
subscriptionDelay().asJavaObservable.asInstanceOf[rx.Observable[Any]]
2736+
}
2737+
val itemDelayJava = new Func1[T, rx.Observable[Any]] {
2738+
override def call(t: T): rx.Observable[Any] =
2739+
itemDelay(t).asJavaObservable.asInstanceOf[rx.Observable[Any]]
2740+
}
2741+
toScalaObservable[T](asJavaObservable.delay[Any, Any](subscriptionDelayJava, itemDelayJava))
2742+
}
2743+
26932744
/**
26942745
* Return an Observable that delays the subscription to the source Observable by a given amount of time.
26952746
*

0 commit comments

Comments
 (0)