Skip to content

Commit 7c4d23a

Browse files
work on Schedulers
rx.lang.scala.concurrency.UnitTest.testInterval fails
1 parent 79ce193 commit 7c4d23a

File tree

6 files changed

+209
-27
lines changed

6 files changed

+209
-27
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package rx.lang.scala
1717

1818
import java.{ lang => jlang }
1919
import rx.util.functions._
20+
import rx.lang.scala.concurrency.GenericScheduler
2021

2122
/**
2223
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
@@ -27,6 +28,17 @@ import rx.util.functions._
2728
object ImplicitFunctionConversions {
2829
import language.implicitConversions
2930

31+
implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
32+
new Func2[rx.Scheduler, T, Subscription] {
33+
def call(s: rx.Scheduler, t: T): Subscription = {
34+
action(s, t)
35+
}
36+
}
37+
38+
implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJava
39+
40+
implicit def javaSchedulerToScalaScheduler[S <: rx.Scheduler](s: S): GenericScheduler[S] = new GenericScheduler(s)
41+
3042
implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
3143
new rx.Observable.OnSubscribeFunc[T] {
3244
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package rx.lang.scala.concurrency
2+
3+
import rx.Subscription
4+
import java.util.Date
5+
import scala.concurrent.duration.Duration
6+
import rx.lang.scala.ImplicitFunctionConversions._
7+
import rx.util.functions.Func2
8+
import rx.lang.scala.Scheduler
9+
import rx.lang.scala.Observer
10+
import org.scalatest.junit.JUnitSuite
11+
import org.junit.Before
12+
import rx.lang.scala.Observable
13+
14+
15+
class GenericScheduler[+S <: rx.Scheduler](val asJava: S) extends AnyVal {
16+
/**
17+
* Schedules a cancelable action to be executed.
18+
*
19+
* @param state
20+
* State to pass into the action.
21+
* @param action
22+
* Action to schedule.
23+
* @return a subscription to be able to unsubscribe from action.
24+
*/
25+
def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = {
26+
asJava.schedule(state, action)
27+
}
28+
29+
/**
30+
* Schedules a cancelable action to be executed in delayTime.
31+
*
32+
* @param state
33+
* State to pass into the action.
34+
* @param action
35+
* Action to schedule.
36+
* @param delayTime
37+
* Time the action is to be delayed before executing.
38+
* @param unit
39+
* Time unit of the delay time.
40+
* @return a subscription to be able to unsubscribe from action.
41+
*/
42+
def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = {
43+
asJava.schedule(state, action, delayTime.length, delayTime.unit)
44+
}
45+
46+
/**
47+
* Schedules a cancelable action to be executed periodically.
48+
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
49+
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
50+
*
51+
* @param state
52+
* State to pass into the action.
53+
* @param action
54+
* The action to execute periodically.
55+
* @param initialDelay
56+
* Time to wait before executing the action for the first time.
57+
* @param period
58+
* The time interval to wait each time in between executing the action.
59+
* @return A subscription to be able to unsubscribe from action.
60+
*/
61+
def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Duration, period: Duration): Subscription = {
62+
asJava.schedulePeriodically(state, action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)
63+
}
64+
65+
/**
66+
* Schedules a cancelable action to be executed at dueTime.
67+
*
68+
* @param state
69+
* State to pass into the action.
70+
* @param action
71+
* Action to schedule.
72+
* @param dueTime
73+
* Time the action is to be executed. If in the past it will be executed immediately.
74+
* @return a subscription to be able to unsubscribe from action.
75+
*/
76+
def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription = {
77+
asJava.schedule(state, action, dueTime)
78+
}
79+
80+
/**
81+
* Schedules an action to be executed.
82+
*
83+
* @param action
84+
* action
85+
* @return a subscription to be able to unsubscribe from action.
86+
*/
87+
def schedule(action: () => Unit): Subscription = {
88+
asJava.schedule(action)
89+
}
90+
91+
/**
92+
* Schedules an action to be executed in delayTime.
93+
*
94+
* @param action
95+
* action
96+
* @return a subscription to be able to unsubscribe from action.
97+
*/
98+
def schedule(action: () => Unit, delayTime: Duration): Subscription = {
99+
asJava.schedule(action, delayTime.length, delayTime.unit)
100+
}
101+
102+
/**
103+
* Schedules an action to be executed periodically.
104+
*
105+
* @param action
106+
* The action to execute periodically.
107+
* @param initialDelay
108+
* Time to wait before executing the action for the first time.
109+
* @param period
110+
* The time interval to wait each time in between executing the action.
111+
* @return A subscription to be able to unsubscribe from action.
112+
*/
113+
def schedulePeriodically(action: () => Unit, initialDelay: Duration, period: Duration): Subscription = {
114+
asJava.schedulePeriodically(action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit)
115+
}
116+
117+
/**
118+
* @return the scheduler's notion of current absolute time in milliseconds.
119+
*/
120+
def now: Long = {
121+
asJava.now
122+
}
123+
124+
/**
125+
* Parallelism available to a Scheduler.
126+
*
127+
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
128+
*
129+
* @return the scheduler's available degree of parallelism.
130+
*/
131+
def degreeOfParallelism: Int = {
132+
asJava.degreeOfParallelism
133+
}
134+
135+
}
136+
137+
class UnitTest extends JUnitSuite {
138+
import org.mockito.Matchers._
139+
import org.mockito.Mockito._
140+
import org.junit.Test
141+
import org.junit.Before
142+
import scala.concurrent.duration._
143+
import scala.language.postfixOps
144+
145+
var scheduler: TestScheduler = null
146+
var observer: Observer[Long] = null
147+
var observer2: Observer[Long] = null
148+
149+
@Before def before() {
150+
scheduler = TestScheduler()
151+
observer = mock(classOf[rx.Observer[Long]])
152+
observer2 = mock(classOf[rx.Observer[Long]])
153+
}
154+
155+
@Test def testInterval() {
156+
val w = Observable.interval(1 seconds)
157+
val sub = w.subscribe(observer)
158+
159+
verify(observer, never()).onNext(0L)
160+
verify(observer, never()).onCompleted()
161+
verify(observer, never()).onError(any(classOf[Throwable]))
162+
163+
scheduler.advanceTimeTo(2 seconds)
164+
165+
val inOrdr = inOrder(observer);
166+
inOrdr.verify(observer, times(1)).onNext(0L)
167+
inOrdr.verify(observer, times(1)).onNext(1L)
168+
inOrdr.verify(observer, never()).onNext(2L)
169+
verify(observer, never()).onCompleted();
170+
verify(observer, never()).onError(any(classOf[Throwable]))
171+
172+
sub.unsubscribe();
173+
scheduler.advanceTimeTo(4 seconds)
174+
verify(observer, never()).onNext(2L)
175+
verify(observer, times(1)).onCompleted()
176+
verify(observer, never()).onError(any(classOf[Throwable]))
177+
}
178+
}
179+

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/Schedulers.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package rx.lang.scala.concurrency
22

3-
import rx.Scheduler
43
import java.util.concurrent.Executor
54
import java.util.concurrent.ScheduledExecutorService
5+
import rx.lang.scala.Scheduler
6+
import rx.lang.scala.ImplicitFunctionConversions._
67

78
/**
89
* Factory methods for creating Schedulers.
Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
package rx.lang.scala.concurrency
22

33
import scala.concurrent.duration.Duration
4-
import rx.lang.scala.Subscription
5-
import rx.lang.scala.Scheduler
6-
import rx.lang.scala.ImplicitFunctionConversions._
7-
import rx.util.functions.Func2
8-
import java.util.concurrent.TimeUnit
94

10-
// TODO make a Scheduler interface in Java, and a DefaultScheduler Java and one for Scala
115

12-
class TestScheduler extends Scheduler {
6+
7+
class TestScheduler {
138

149
private val asJava = new rx.concurrency.TestScheduler
1510

16-
override def now: Long = asJava.now
11+
/*override*/ def now: Long = asJava.now
1712

1813
def advanceTimeBy(time: Duration) {
1914
asJava.advanceTimeBy(time.length, time.unit)
@@ -26,21 +21,13 @@ class TestScheduler extends Scheduler {
2621
def triggerActions() {
2722
asJava.triggerActions()
2823
}
24+
}
2925

30-
def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = {
31-
asJava.schedule(state, action)
32-
}
33-
34-
def schedule[T](state: T, action: (Scheduler, T) => Subscription, delay: Duration): Subscription = {
35-
asJava.schedule(state, action, delay.length, delay.unit)
36-
}
37-
38-
override def schedule[T](state: T, action: Func2[_ >: Scheduler, _ >: T, _ <: Subscription]): Subscription = {
39-
asJava.schedule(state, action)
40-
}
4126

42-
override def schedule[T](state: T, action: Func2[_ >: Scheduler, _ >: T, _ <: Subscription], delayTime: Long, unit: TimeUnit): Subscription = {
43-
asJava.schedule(state, action, delayTime, unit)
27+
object TestScheduler {
28+
def apply(): TestScheduler = {
29+
//rx.lang.scala.ImplicitFunctionConversions.javaSchedulerToScalaScheduler(new rx.concurrency.TestScheduler())
30+
new TestScheduler
4431
}
45-
4632
}
33+

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/concurrency/package.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ import rx.concurrency.CurrentThreadScheduler
2121
* Provides schedulers.
2222
*/
2323
package object concurrency {
24-
// These classes are not exposed to Scala users, but are accessible through
25-
// rx.lang.scala.concurrency.Schedulers:
24+
25+
//type TestScheduler = GenericScheduler[rx.concurrency.TestScheduler]
26+
27+
// These classes are not exposed to Scala users, but are accessible through rx.lang.scala.concurrency.Schedulers:
2628

2729
// rx.concurrency.CurrentThreadScheduler
2830
// rx.concurrency.ExecutorScheduler
2931
// rx.concurrency.ImmediateScheduler
3032
// rx.concurrency.NewThreadScheduler
31-
}
33+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package rx.lang
1717

1818
import java.util.concurrent.TimeUnit
1919
import java.util.Date
20+
import rx.lang.scala.concurrency.GenericScheduler
2021

2122
/*
2223
* Note that:
@@ -227,7 +228,7 @@ package object scala {
227228

228229
type Observer[-T] = rx.Observer[_ >: T]
229230

230-
type Scheduler = rx.Scheduler
231+
type Scheduler = GenericScheduler[rx.Scheduler]
231232

232233
type Subscription = rx.Subscription
233234

0 commit comments

Comments
 (0)