Skip to content

Commit 720619c

Browse files
committed
Merge branch 'master' into rxscala-more
Conflicts: language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
2 parents f4640ec + d64b3a1 commit 720619c

File tree

20 files changed

+595
-463
lines changed

20 files changed

+595
-463
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package rx.lang.scala.examples
1717

1818
import java.io.IOException
19+
import java.util.concurrent.CountDownLatch
20+
import java.util.concurrent.TimeUnit
1921

2022
import scala.concurrent.duration.Duration
2123
import scala.concurrent.duration.DurationInt
@@ -857,4 +859,49 @@ class RxScalaDemo extends JUnitSuite {
857859
.timeInterval
858860
println(o.toBlockingObservable.toList)
859861
}
862+
863+
@Test def schedulerExample1(): Unit = {
864+
val latch = new CountDownLatch(1)
865+
val worker = IOScheduler().createWorker
866+
worker.schedule {
867+
println("Hello from Scheduler")
868+
latch.countDown()
869+
}
870+
latch.await(5, TimeUnit.SECONDS)
871+
}
872+
873+
@Test def schedulerExample2(): Unit = {
874+
val latch = new CountDownLatch(1)
875+
val worker = IOScheduler().createWorker
876+
worker.schedule(1 seconds) {
877+
println("Hello from Scheduler after 1 second")
878+
latch.countDown()
879+
}
880+
latch.await(5, TimeUnit.SECONDS)
881+
}
882+
883+
@Test def schedulerExample3(): Unit = {
884+
val worker = IOScheduler().createWorker
885+
var no = 1
886+
val subscription = worker.schedulePeriodically(initialDelay = 1 seconds, period = 100 millis) {
887+
println(s"Hello(${no}) from Scheduler")
888+
no += 1
889+
}
890+
TimeUnit.SECONDS.sleep(2)
891+
subscription.unsubscribe()
892+
}
893+
894+
@Test def schedulerExample4(): Unit = {
895+
val worker = IOScheduler().createWorker
896+
var no = 1
897+
def hello: Unit = {
898+
println(s"Hello(${no}) from Scheduler")
899+
no += 1
900+
worker.schedule(100 millis)(hello)
901+
}
902+
val subscription = worker.schedule(1 seconds)(hello)
903+
TimeUnit.SECONDS.sleep(2)
904+
subscription.unsubscribe()
905+
}
906+
860907
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,26 @@ trait Scheduler {
3131
/**
3232
* Parallelism available to a Scheduler.
3333
*
34-
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
34+
* This defaults to `Runtime.getRuntime().availableProcessors()` but can be overridden for use cases such as scheduling work on a computer cluster.
3535
*
3636
* @return the scheduler's available degree of parallelism.
3737
*/
38-
def degreeOfParallelism: Int = asJavaScheduler.degreeOfParallelism
38+
def parallelism: Int = asJavaScheduler.parallelism()
3939

4040
/**
4141
* @return the scheduler's notion of current absolute time in milliseconds.
4242
*/
4343
def now: Long = this.asJavaScheduler.now()
4444

45+
/**
46+
* Retrieve or create a new [[rx.lang.scala.Worker]] that represents serial execution of actions.
47+
* <p>
48+
* When work is completed it should be unsubscribed using [[rx.lang.scala.Worker unsubscribe]].
49+
* <p>
50+
* Work on a [[rx.lang.scala.Worker]] is guaranteed to be sequential.
51+
*
52+
* @return Inner representing a serial queue of actions to be executed
53+
*/
4554
def createWorker: Worker = this.asJavaScheduler.createWorker()
4655

4756
}
@@ -54,24 +63,55 @@ trait Worker extends Subscription {
5463
private [scala] val asJavaWorker: rx.Scheduler.Worker
5564

5665
/**
57-
* Schedules a cancelable action to be executed in delayTime.
66+
* Schedules an Action for execution at some point in the future.
67+
*
68+
* @param action the Action to schedule
69+
* @param delay time to wait before executing the action
70+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
5871
*/
59-
def schedule(action: Unit => Unit, delayTime: Duration): Subscription =
72+
def schedule(delay: Duration)(action: => Unit): Subscription = {
6073
this.asJavaWorker.schedule(
6174
new Action0 {
62-
override def call(): Unit = action()
75+
override def call(): Unit = action
6376
},
64-
delayTime.length,
65-
delayTime.unit)
77+
delay.length,
78+
delay.unit)
79+
}
80+
81+
/**
82+
* Schedules an Action for execution.
83+
*
84+
* @param action the Action to schedule
85+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
86+
*/
87+
def schedule(action: => Unit): Subscription = {
88+
this.asJavaWorker.schedule(
89+
new Action0 {
90+
override def call(): Unit = action
91+
}
92+
)
93+
}
6694

6795
/**
68-
* Schedules a cancelable action to be executed immediately.
96+
* Schedules a cancelable action to be executed periodically. This default implementation schedules
97+
* recursively and waits for actions to complete (instead of potentially executing long-running actions
98+
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
99+
*
100+
* @param action the Action to execute periodically
101+
* @param initialDelay time to wait before executing the action for the first time
102+
* @param period the time interval to wait each time in between executing the action
103+
* @return a subscription to be able to unsubscribe the action (unschedule it if not executed)
69104
*/
70-
def schedule(action: Unit => Unit): Subscription = this.asJavaWorker.schedule(
71-
new Action0 {
72-
override def call(): Unit = action()
73-
}
74-
)
105+
def schedulePeriodically(initialDelay: Duration, period: Duration)(action: => Unit): Subscription = {
106+
this.asJavaWorker.schedulePeriodically(
107+
new Action0 {
108+
override def call(): Unit = action
109+
},
110+
initialDelay.toNanos,
111+
period.toNanos,
112+
duration.NANOSECONDS
113+
)
114+
}
75115

76116
/**
77117
* @return the scheduler's notion of current absolute time in milliseconds.

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/ComputationScheduler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ import rx.lang.scala.Scheduler
55

66
object ComputationScheduler {
77
/**
8-
* {@link Scheduler} intended for computational work.
8+
* [[rx.lang.scala.Scheduler]] intended for computational work.
99
* <p>
1010
* This can be used for event-loops, processing callbacks and other computational work.
1111
* <p>
12-
* Do not perform IO-bound work on this scheduler. Use {@link IOScheduler()} instead.
12+
* Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.schedulers.IOScheduler]] instead.
1313
*
14-
* @return { @link Scheduler} for computation-bound work.
14+
* @return [[rx.lang.scala.Scheduler]] for computation-bound work.
1515
*/
1616
def apply(): ComputationScheduler = {
1717
new ComputationScheduler(rx.schedulers.Schedulers.computation())

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/IOScheduler.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ import rx.lang.scala.Scheduler
1919

2020
object IOScheduler {
2121
/**
22-
* {@link Scheduler} intended for IO-bound work.
22+
* [[rx.lang.scala.Scheduler]] intended for IO-bound work.
2323
* <p>
24-
* The implementation is backed by an {@link Executor} thread-pool that will grow as needed.
24+
* The implementation is backed by an `Executor` thread-pool that will grow as needed.
2525
* <p>
2626
* This can be used for asynchronously performing blocking IO.
2727
* <p>
28-
* Do not perform computational work on this scheduler. Use {@link ComputationScheduler()} instead.
28+
* Do not perform computational work on this scheduler. Use [[rx.lang.scala.schedulers.ComputationScheduler]] instead.
2929
*
30-
* @return { @link ExecutorScheduler} for IO-bound work.
30+
* @return [[rx.lang.scala.Scheduler]] for IO-bound work
3131
*/
3232
def apply(): IOScheduler = {
3333
new IOScheduler(rx.schedulers.Schedulers.io)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/NewThreadScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import rx.lang.scala.Scheduler
2020
object NewThreadScheduler {
2121

2222
/**
23-
* Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work.
23+
* Returns a [[rx.lang.scala.Scheduler]] that creates a new `java.lang.Thread` for each unit of work.
2424
*/
2525
def apply(): NewThreadScheduler = {
2626
new NewThreadScheduler(rx.schedulers.Schedulers.newThread())

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/schedulers/TrampolineScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import rx.lang.scala.Scheduler
44

55
object TrampolineScheduler {
66
/**
7-
* {@link Scheduler} that queues work on the current thread to be executed after the current work completes.
7+
* [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes.
88
*/
99
def apply(): TrampolineScheduler = {
1010
new TrampolineScheduler(rx.schedulers.Schedulers.trampoline())

0 commit comments

Comments
 (0)