Skip to content

Commit c948a26

Browse files
committed
Fix issue #1187
1 parent 50b618e commit c948a26

File tree

2 files changed

+100
-12
lines changed

2 files changed

+100
-12
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package rx.lang.scala.examples
1717

1818
import java.io.IOException
19+
import java.util.concurrent.CountDownLatch
20+
import java.util.concurrent.TimeUnit
1921

2022
import scala.concurrent.duration.Duration
2123
import scala.concurrent.duration.DurationInt
@@ -790,4 +792,50 @@ class RxScalaDemo extends JUnitSuite {
790792
assertEquals(List(1, 2, 3, 4), o.toBlockingObservable.toList)
791793
}
792794

795+
@Test def schedulerExample1(): Unit = {
796+
val latch = new CountDownLatch(1)
797+
val worker = IOScheduler().createWorker
798+
worker.schedule {
799+
println("Hello from Scheduler")
800+
latch.countDown()
801+
}
802+
latch.await(5, TimeUnit.SECONDS)
803+
}
804+
805+
@Test def schedulerExample2(): Unit = {
806+
val latch = new CountDownLatch(1)
807+
val worker = IOScheduler().createWorker
808+
worker.schedule(
809+
{
810+
println("Hello from Scheduler after 1 second")
811+
latch.countDown()
812+
}, 1 seconds)
813+
latch.await(5, TimeUnit.SECONDS)
814+
}
815+
816+
@Test def schedulerExample3(): Unit = {
817+
val worker = IOScheduler().createWorker
818+
var no = 1
819+
val subscription = worker.schedulePeriodically(
820+
{
821+
println(s"Hello(${no}) from Scheduler")
822+
no += 1
823+
}, initialDelay = 1 seconds, period = 100 millis)
824+
TimeUnit.SECONDS.sleep(2)
825+
subscription.unsubscribe()
826+
}
827+
828+
@Test def schedulerExample4(): Unit = {
829+
val worker = IOScheduler().createWorker
830+
var no = 1
831+
def hello: Unit = {
832+
println(s"Hello(${no}) from Scheduler")
833+
no += 1
834+
worker.schedule(hello, 100 millis)
835+
}
836+
val subscription = worker.schedule(hello, 1 seconds)
837+
TimeUnit.SECONDS.sleep(2)
838+
subscription.unsubscribe()
839+
}
840+
793841
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ trait Scheduler {
3131
/**
3232
* Parallelism available to a Scheduler.
3333
*
34-
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
34+
* This defaults to `Runtime.getRuntime().availableProcessors()` but can be overridden for use cases such as scheduling work on a computer cluster.
3535
*
3636
* @return the scheduler's available degree of parallelism.
3737
*/
@@ -42,6 +42,15 @@ trait Scheduler {
4242
*/
4343
def now: Long = this.asJavaScheduler.now()
4444

45+
/**
46+
* Retrieve or create a new [[rx.lang.scala.Worker]] that represents serial execution of actions.
47+
* <p>
48+
* When work is completed it should be unsubscribed using [[rx.lang.scala.Worker unsubscribe]].
49+
* <p>
50+
* Work on a [[rx.lang.scala.Worker]] is guaranteed to be sequential.
51+
*
52+
* @return Inner representing a serial queue of actions to be executed
53+
*/
4554
def createWorker: Worker = this.asJavaScheduler.createWorker()
4655

4756
}
@@ -54,24 +63,55 @@ trait Worker extends Subscription {
5463
private [scala] val asJavaWorker: rx.Scheduler.Worker
5564

5665
/**
57-
* Schedules a cancelable action to be executed in delayTime.
66+
* Schedules an Action for execution at some point in the future.
67+
*
68+
* @param action the Action to schedule
69+
* @param delay time to wait before executing the action
70+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
5871
*/
59-
def schedule(action: Unit => Unit, delayTime: Duration): Subscription =
72+
def schedule(action: => Unit, delay: Duration): Subscription = {
6073
this.asJavaWorker.schedule(
6174
new Action0 {
62-
override def call(): Unit = action()
75+
override def call(): Unit = action
6376
},
64-
delayTime.length,
65-
delayTime.unit)
77+
delay.length,
78+
delay.unit)
79+
}
80+
81+
/**
82+
* Schedules an Action for execution.
83+
*
84+
* @param action the Action to schedule
85+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
86+
*/
87+
def schedule(action: => Unit): Subscription = {
88+
this.asJavaWorker.schedule(
89+
new Action0 {
90+
override def call(): Unit = action
91+
}
92+
)
93+
}
6694

6795
/**
68-
* Schedules a cancelable action to be executed immediately.
96+
* Schedules a cancelable action to be executed periodically. This default implementation schedules
97+
* recursively and waits for actions to complete (instead of potentially executing long-running actions
98+
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
99+
*
100+
* @param action the Action to execute periodically
101+
* @param initialDelay time to wait before executing the action for the first time
102+
* @param period the time interval to wait each time in between executing the action
103+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
69104
*/
70-
def schedule(action: Unit => Unit): Subscription = this.asJavaWorker.schedule(
71-
new Action0 {
72-
override def call(): Unit = action()
73-
}
74-
)
105+
def schedulePeriodically(action: => Unit, initialDelay: Duration, period: Duration): Subscription = {
106+
this.asJavaWorker.schedulePeriodically(
107+
new Action0 {
108+
override def call(): Unit = action
109+
},
110+
initialDelay.toNanos,
111+
period.toNanos,
112+
duration.NANOSECONDS
113+
)
114+
}
75115

76116
/**
77117
* @return the scheduler's notion of current absolute time in milliseconds.

0 commit comments

Comments
 (0)