Skip to content

Commit f1a3fb2

Browse files
add rx.lang.scala.concurrency.Schedulers
1 parent add46e6 commit f1a3fb2

File tree

3 files changed

+70
-15
lines changed

3 files changed

+70
-15
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import rx.lang.scala._
2121
import scala.concurrent.duration._
2222
import org.junit.{Before, Test, Ignore}
2323
import org.junit.Assert._
24-
import rx.lang.scala.concurrency.NewThreadScheduler
24+
import rx.lang.scala.concurrency.Schedulers
2525
import rx.lang.scala.util.Timestamped
2626
import java.io.IOException
2727

@@ -169,10 +169,10 @@ class RxScalaDemo extends JUnitSuite {
169169

170170
@Test def schedulersExample() {
171171
val o = Observable.interval(100 millis).take(8)
172-
o.observeOn(NewThreadScheduler).subscribe(
172+
o.observeOn(Schedulers.newThread).subscribe(
173173
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
174174
)
175-
o.observeOn(NewThreadScheduler).subscribe(
175+
o.observeOn(Schedulers.newThread).subscribe(
176176
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
177177
)
178178
waitFor(o)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package rx.lang.scala.concurrency
2+
3+
import rx.Scheduler
4+
import java.util.concurrent.Executor
5+
import java.util.concurrent.ScheduledExecutorService
6+
7+
/**
8+
* Factory methods for creating Schedulers.
9+
*/
10+
object Schedulers {
11+
12+
/**
13+
* Returns a [[rx.lang.scala.Scheduler]] that executes work immediately on the current thread.
14+
*/
15+
def immediate: Scheduler = rx.concurrency.Schedulers.immediate()
16+
17+
/**
18+
* Returns a [[rx.lang.scala.Scheduler]] that queues work on the current thread to be executed after the current work completes.
19+
*/
20+
def currentThread: Scheduler = rx.concurrency.Schedulers.currentThread()
21+
22+
/**
23+
* Returns a [[rx.lang.scala.Scheduler]] that creates a new {@link Thread} for each unit of work.
24+
*/
25+
def newThread: Scheduler = rx.concurrency.Schedulers.newThread
26+
27+
/**
28+
* Returns a [[rx.lang.scala.Scheduler]] that queues work on an [[java.util.concurrent.Executor]].
29+
*
30+
* Note that this does not support scheduled actions with a delay.
31+
*/
32+
def executor(executor: Executor): Scheduler = rx.concurrency.Schedulers.executor(executor)
33+
34+
/**
35+
* Returns a [[rx.lang.scala.Scheduler]] that queues work on an [[java.util.concurrent.ScheduledExecutorService]].
36+
*/
37+
def executor(executor: ScheduledExecutorService): Scheduler = rx.concurrency.Schedulers.executor(executor)
38+
39+
/**
40+
* Returns a [[rx.lang.scala.Scheduler]] intended for computational work.
41+
*
42+
* The implementation is backed by a [[java.util.concurrent.ScheduledExecutorService]] thread-pool sized to the number of CPU cores.
43+
*
44+
* This can be used for event-loops, processing callbacks and other computational work.
45+
*
46+
* Do not perform IO-bound work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForIO]] instead.
47+
*/
48+
def threadPoolForComputation: Scheduler = rx.concurrency.Schedulers.threadPoolForComputation()
49+
50+
/**
51+
* [[rx.lang.scala.Scheduler]] intended for IO-bound work.
52+
*
53+
* The implementation is backed by an [[java.util.concurrent.Executor]] thread-pool that will grow as needed.
54+
*
55+
* This can be used for asynchronously performing blocking IO.
56+
*
57+
* Do not perform computational work on this scheduler. Use [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation]] instead.
58+
*/
59+
def threadPoolForIO: Scheduler = rx.concurrency.Schedulers.threadPoolForIO()
60+
61+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,11 @@ package rx.lang.scala
1818
import rx.concurrency.CurrentThreadScheduler
1919

2020
package object concurrency {
21-
/*
22-
TODO
23-
rx.concurrency.CurrentThreadScheduler
24-
rx.concurrency.ExecutorScheduler
25-
rx.concurrency.ImmediateScheduler
26-
rx.concurrency.NewThreadScheduler
27-
rx.concurrency.Schedulers
28-
rx.concurrency.TestScheduler
29-
*/
21+
// These classes are not exposed to Scala users, but are accessible through
22+
// rx.lang.scala.concurrency.Schedulers:
3023

31-
lazy val CurrentThreadScheduler = rx.concurrency.CurrentThreadScheduler.getInstance()
32-
lazy val NewThreadScheduler = rx.concurrency.NewThreadScheduler.getInstance()
33-
24+
// rx.concurrency.CurrentThreadScheduler
25+
// rx.concurrency.ExecutorScheduler
26+
// rx.concurrency.ImmediateScheduler
27+
// rx.concurrency.NewThreadScheduler
3428
}

0 commit comments

Comments
 (0)