Skip to content

Commit dcbae87

Browse files
Scheduler and TestScheduler
1 parent 7c4d23a commit dcbae87

File tree

6 files changed

+215
-312
lines changed

6 files changed

+215
-312
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package rx.lang.scala
1717

1818
import java.{ lang => jlang }
1919
import rx.util.functions._
20-
import rx.lang.scala.concurrency.GenericScheduler
2120

2221
/**
2322
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
@@ -37,7 +36,7 @@ object ImplicitFunctionConversions {
3736

3837
implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJava
3938

40-
implicit def javaSchedulerToScalaScheduler[S <: rx.Scheduler](s: S): GenericScheduler[S] = new GenericScheduler(s)
39+
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)
4140

4241
implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
4342
new rx.Observable.OnSubscribeFunc[T] {
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package rx.lang.scala
2+
3+
import java.util.Date
4+
5+
import scala.concurrent.duration.Duration
6+
import scala.concurrent.duration.DurationInt
7+
import scala.language.postfixOps
8+
9+
import org.junit.Before
10+
import org.junit.Test
11+
import org.mockito.Matchers.any
12+
import org.mockito.Mockito.inOrder
13+
import org.mockito.Mockito.mock
14+
import org.mockito.Mockito.never
15+
import org.mockito.Mockito.times
16+
import org.mockito.Mockito.verify
17+
import org.scalatest.junit.JUnitSuite
18+
19+
import rx.lang.scala.ImplicitFunctionConversions.scalaFunction0ProducingUnitToAction0
20+
import rx.lang.scala.ImplicitFunctionConversions.schedulerActionToFunc2
21+
import rx.lang.scala.concurrency.TestScheduler
22+
23+
24+
/**
25+
* Represents an object that schedules units of work.
26+
*/
27+
trait Scheduler {
28+
def asJava: rx.Scheduler
29+
30+
/**
31+
* Schedules a cancelable action to be executed.
32+
*
33+
* @param state
34+
* State to pass into the action.
35+
* @param action
36+
* Action to schedule.
37+
* @return a subscription to be able to unsubscribe from action.
38+
*/
39+
def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = {
40+
asJava.schedule(state, action)
41+
}
42+
43+
/**
44+
* Schedules a cancelable action to be executed in delayTime.
45+
*
46+
* @param state
47+
* State to pass into the action.
48+
* @param action
49+
* Action to schedule.
50+
* @param delayTime
51+
* Time the action is to be delayed before executing.
52+
* @param unit
53+
* Time unit of the delay time.
54+
* @return a subscription to be able to unsubscribe from action.
55+
*/
56+
def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = {
57+
asJava.schedule(state, action, delayTime.length, delayTime.unit)
58+
}
59+
60+
/**
61+
* Schedules a cancelable action to be executed periodically.
62+
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
63+
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
64+
*
65+
* @param state
66+
* State to pass into the action.
67+
* @param action
68+
* The action to execute periodically.
69+
* @param initialDelay
70+
* Time to wait before executing the action for the first time.
71+
* @param period
72+
* The time interval to wait each time in between executing the action.
73+
* @return A subscription to be able to unsubscribe from action.
74+
*/
75+
def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Duration, period: Duration): Subscription = {
76+
asJava.schedulePeriodically(state, action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)
77+
}
78+
79+
/**
80+
* Schedules a cancelable action to be executed at dueTime.
81+
*
82+
* @param state
83+
* State to pass into the action.
84+
* @param action
85+
* Action to schedule.
86+
* @param dueTime
87+
* Time the action is to be executed. If in the past it will be executed immediately.
88+
* @return a subscription to be able to unsubscribe from action.
89+
*/
90+
def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription = {
91+
asJava.schedule(state, action, dueTime)
92+
}
93+
94+
/**
95+
* Schedules an action to be executed.
96+
*
97+
* @param action
98+
* action
99+
* @return a subscription to be able to unsubscribe from action.
100+
*/
101+
def schedule(action: () => Unit): Subscription = {
102+
asJava.schedule(action)
103+
}
104+
105+
/**
106+
* Schedules an action to be executed in delayTime.
107+
*
108+
* @param action
109+
* action
110+
* @return a subscription to be able to unsubscribe from action.
111+
*/
112+
def schedule(action: () => Unit, delayTime: Duration): Subscription = {
113+
asJava.schedule(action, delayTime.length, delayTime.unit)
114+
}
115+
116+
/**
117+
* Schedules an action to be executed periodically.
118+
*
119+
* @param action
120+
* The action to execute periodically.
121+
* @param initialDelay
122+
* Time to wait before executing the action for the first time.
123+
* @param period
124+
* The time interval to wait each time in between executing the action.
125+
* @return A subscription to be able to unsubscribe from action.
126+
*/
127+
def schedulePeriodically(action: () => Unit, initialDelay: Duration, period: Duration): Subscription = {
128+
asJava.schedulePeriodically(action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)
129+
}
130+
131+
/**
132+
* @return the scheduler's notion of current absolute time in milliseconds.
133+
*/
134+
def now: Long = {
135+
asJava.now
136+
}
137+
138+
/**
139+
* Parallelism available to a Scheduler.
140+
*
141+
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
142+
*
143+
* @return the scheduler's available degree of parallelism.
144+
*/
145+
def degreeOfParallelism: Int = {
146+
asJava.degreeOfParallelism
147+
}
148+
149+
}
150+
151+
/**
152+
* Provides constructors for Schedulers.
153+
*/
154+
object Scheduler {
155+
private class WrapJavaScheduler(val asJava: rx.Scheduler) extends Scheduler
156+
157+
/**
158+
* Constructs a Scala Scheduler from a Java Scheduler.
159+
*/
160+
def apply(s: rx.Scheduler): Scheduler = new WrapJavaScheduler(s)
161+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/GenericScheduler.scala

Lines changed: 0 additions & 179 deletions
This file was deleted.

0 commit comments

Comments
 (0)