Skip to content

Commit f6fe8e7

Browse files
- Updated schedulers to reflext 0.17 changes.
- Add timer bindings
1 parent dc15e2b commit f6fe8e7

15 files changed

+161
-269
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
#Tue Sep 03 10:20:57 PDT 2013
1+
#Wed Feb 05 12:05:54 CET 2014
22
distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=http\://services.gradle.org/distributions/gradle-1.10-bin.zip
6+
distributionUrl=http\://services.gradle.org/distributions/gradle-1.10-all.zip

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,12 @@ object JavaConversions {
3333
implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)
3434

3535
implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler
36-
3736
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)
3837

38+
implicit def scalaInnerToJavaInner(s: Inner): rx.Scheduler.Inner = s.asJavaInner
39+
implicit def javaInnerToScalaInner(s: rx.Scheduler.Inner): Inner = Inner(s)
40+
41+
3942
implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver
4043

4144
implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ object Notification {
9898
* The item passed to the onNext method.
9999
*/
100100
def apply[T](value: T): Notification[T] = {
101-
Notification(new rx.Notification[T](value))
101+
Notification(rx.Notification.createOnNext[T](value))
102102
}
103103

104104
/**
@@ -128,7 +128,7 @@ object Notification {
128128
* The exception passed to the onNext method.
129129
*/
130130
def apply[T](error: Throwable): Notification[T] = {
131-
Notification(new rx.Notification[T](error))
131+
Notification(rx.Notification.createOnError[T](error))
132132
}
133133

134134
/**
@@ -156,7 +156,7 @@ object Notification {
156156
* Constructor for onCompleted notifications.
157157
*/
158158
def apply[T](): Notification[T] = {
159-
Notification(new rx.Notification())
159+
Notification(rx.Notification.createOnCompleted[T]())
160160
}
161161

162162
/**

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package rx.lang.scala
1919
import rx.util.functions.FuncN
2020
import rx.Observable.OnSubscribeFunc
2121
import rx.lang.scala.observables.ConnectableObservable
22+
import scala.concurrent.duration
2223

2324

2425
/**
@@ -2342,17 +2343,53 @@ object Observable {
23422343
*
23432344
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/interval.png">
23442345
*
2345-
* @param duration
2346+
* @param period
23462347
* duration between two consecutive numbers
23472348
* @param scheduler
23482349
* the scheduler to use
23492350
* @return An Observable that emits a number each time interval.
23502351
*/
2351-
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] = {
2352-
toScalaObservable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit, scheduler)).map(_.longValue())
2352+
def interval(period: Duration, scheduler: Scheduler): Observable[Long] = {
2353+
toScalaObservable[java.lang.Long](rx.Observable.interval(period.length, period.unit, scheduler)).map(_.longValue())
2354+
}
2355+
2356+
/**
2357+
* Return an Observable that emits a 0L after the {@code initialDelay} and ever increasing
2358+
* numbers after each {@code period} of time thereafter, on a specified Scheduler.
2359+
* <p>
2360+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timer.ps.png">
2361+
*
2362+
* @param initialDelay
2363+
* the initial delay time to wait before emitting the first value of 0L
2364+
* @param period
2365+
* the period of time between emissions of the subsequent numbers
2366+
* @return an Observable that emits a 0L after the { @code initialDelay} and ever increasing
2367+
* numbers after each { @code period} of time thereafter, while running on the given { @code scheduler}
2368+
*/
2369+
def timer(initialDelay: Duration, period: Duration): Observable[Long] = {
2370+
toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS)).map(_.longValue())
23532371
/*XXX*/
23542372
}
23552373

2374+
/**
2375+
* Return an Observable that emits a 0L after the {@code initialDelay} and ever increasing
2376+
* numbers after each {@code period} of time thereafter, on a specified Scheduler.
2377+
* <p>
2378+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timer.ps.png">
2379+
*
2380+
* @param initialDelay
2381+
* the initial delay time to wait before emitting the first value of 0L
2382+
* @param period
2383+
* the period of time between emissions of the subsequent numbers
2384+
* @param scheduler
2385+
* the scheduler on which the waiting happens and items are emitted
2386+
* @return an Observable that emits a 0L after the { @code initialDelay} and ever increasing
2387+
* numbers after each { @code period} of time thereafter, while running on the given { @code scheduler}
2388+
*/
2389+
def timer(initialDelay: Duration, period: Duration, scheduler: Scheduler): Observable[Long] = {
2390+
toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS, scheduler)).map(_.longValue())
2391+
}
2392+
23562393
}
23572394

23582395

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Scheduler.scala

Lines changed: 51 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -15,199 +15,107 @@
1515
*/
1616
package rx.lang.scala
1717

18-
import java.util.Date
1918
import scala.concurrent.duration.Duration
20-
import rx.util.functions.{Action0, Action1, Func2}
19+
import rx.util.functions.Action1
2120
import rx.lang.scala.schedulers._
21+
import scala.concurrent.duration
22+
import rx.lang.scala.JavaConversions._
2223

2324
/**
2425
* Represents an object that schedules units of work.
2526
*/
2627
trait Scheduler {
27-
import rx.lang.scala.ImplicitFunctionConversions._
2828

2929
private [scala] val asJavaScheduler: rx.Scheduler
3030

3131
/**
32-
* Schedules a cancelable action to be executed.
32+
* Parallelism available to a Scheduler.
3333
*
34-
* @param action Action to schedule.
35-
* @return a subscription to be able to unsubscribe from action.
36-
*/
37-
def schedule(action: Scheduler => Subscription): Subscription = {
38-
this.schedule[Integer](0, (s: Scheduler, x: Integer) => action(s): Subscription): Subscription
39-
}
40-
41-
/**
42-
* Schedules a cancelable action to be executed.
34+
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
4335
*
44-
* @param state State to pass into the action.
45-
* @param action Action to schedule.
46-
* @return a subscription to be able to unsubscribe from action.
36+
* @return the scheduler's available degree of parallelism.
4737
*/
48-
private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription = {
49-
Subscription(asJavaScheduler.schedule(state, new Func2[rx.Scheduler, T, rx.Subscription] {
50-
def call(t1: rx.Scheduler, t2: T): rx.Subscription = {
51-
action(Scheduler(t1), t2).asJavaSubscription
52-
}
53-
}))
54-
}
38+
def degreeOfParallelism: Int = asJavaScheduler.degreeOfParallelism
5539

5640
/**
57-
* Schedules a cancelable action to be executed in delayTime.
58-
*
59-
* @param action Action to schedule.
60-
* @param delayTime Time the action is to be delayed before executing.
61-
* @return a subscription to be able to unsubscribe from action.
41+
* @return the scheduler's notion of current absolute time in milliseconds.
6242
*/
63-
def schedule(delayTime: Duration, action: Scheduler => Subscription): Subscription = {
64-
this.schedule[Integer](0, (s: Scheduler, x: Integer) => action(s), delayTime: Duration): Subscription
65-
}
43+
def now: Long = this.asJavaScheduler.now()
6644

6745
/**
68-
* Schedules a cancelable action to be executed in delayTime.
46+
* Schedules a cancelable action to be executed.
6947
*
70-
* @param state
71-
* State to pass into the action.
72-
* @param action
73-
* Action to schedule.
74-
* @param delayTime
75-
* Time the action is to be delayed before executing.
48+
* @param action Action to schedule.
7649
* @return a subscription to be able to unsubscribe from action.
7750
*/
78-
private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Duration): Subscription = {
79-
Subscription(asJavaScheduler.schedule(state, schedulerActionToFunc2(action), delayTime.length, delayTime.unit))
80-
}
81-
82-
/**
83-
* Schedules a cancelable action to be executed periodically.
84-
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
85-
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
86-
*
87-
* @param action The action to execute periodically.
88-
* @param initialDelay Time to wait before executing the action for the first time.
89-
* @param period The time interval to wait each time in between executing the action.
90-
* @return A subscription to be able to unsubscribe from action.
91-
*/
92-
def schedule(initialDelay: Duration, period: Duration, action: Scheduler => Subscription): Subscription = {
93-
this.schedulePeriodically[Integer](0, (s: Scheduler, x:Integer) => action(s): Subscription, initialDelay: Duration, period: Duration): Subscription
94-
}
51+
def schedule(action: Inner => Unit): Subscription = this.asJavaScheduler.schedule(action)
9552

9653
/**
9754
* Schedules a cancelable action to be executed periodically.
9855
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
9956
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
10057
*
101-
* @param state
102-
* State to pass into the action.
10358
* @param action
104-
* The action to execute periodically.
59+
* The action to execute periodically.
10560
* @param initialDelay
106-
* Time to wait before executing the action for the first time.
61+
* Time to wait before executing the action for the first time.
10762
* @param period
108-
* The time interval to wait each time in between executing the action.
63+
* The time interval to wait each time in between executing the action.
10964
* @return A subscription to be able to unsubscribe from action.
11065
*/
111-
private [scala] def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Duration, period: Duration): Subscription = {
112-
Subscription(asJavaScheduler.schedulePeriodically(state, action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit))
113-
}
114-
115-
/**
116-
* Schedules a cancelable action to be executed at dueTime.
117-
*
118-
* @param action Action to schedule.
119-
* @param dueTime Time the action is to be executed. If in the past it will be executed immediately.
120-
* @return a subscription to be able to unsubscribe from action.
121-
*/
122-
def schedule(dueTime: Date, action: Scheduler => Subscription): Subscription = {
123-
this.schedule(0: Integer, (s: Scheduler, x: Integer) => action(s): Subscription, dueTime: Date): Subscription
124-
}
66+
def schedulePeriodically(action: Inner => Unit, initialDelay: Duration, period: Duration): Subscription =
67+
this.asJavaScheduler.schedulePeriodically (
68+
new Action1[rx.Scheduler.Inner] {
69+
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
70+
},
71+
initialDelay.toNanos,
72+
period.toNanos,
73+
duration.NANOSECONDS
74+
)
12575

126-
/**
127-
* Schedules a cancelable action to be executed at dueTime.
128-
*
129-
* @param state
130-
* State to pass into the action.
131-
* @param action
132-
* Action to schedule.
133-
* @param dueTime
134-
* Time the action is to be executed. If in the past it will be executed immediately.
135-
* @return a subscription to be able to unsubscribe from action.
136-
*/
137-
private [scala] def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription = {
138-
Subscription(asJavaScheduler.schedule(state, action, dueTime))
76+
def scheduleRec(work: (=>Unit)=>Unit): Subscription = {
77+
Subscription(asJavaScheduler.schedule(new Action1[rx.Scheduler.Inner] {
78+
override def call(inner: rx.Scheduler.Inner): Unit = work{ inner.schedule(this) }
79+
}))
13980
}
81+
}
14082

141-
/**
142-
* Schedules an action to be executed.
143-
*
144-
* @param action
145-
* action
146-
* @return a subscription to be able to unsubscribe from action.
147-
*/
148-
def schedule(action: =>Unit): Subscription = {
149-
Subscription(asJavaScheduler.schedule(()=>action))
150-
}
83+
object Inner {
84+
def apply(inner: rx.Scheduler.Inner): Inner = new Inner { private[scala] val asJavaInner = inner }
85+
}
15186

152-
/**
153-
* Schedules an action to be executed in delayTime.
154-
*
155-
* @param action action
156-
* @return a subscription to be able to unsubscribe from action.
157-
*/
158-
def schedule(delayTime: Duration, action: =>Unit): Subscription = {
159-
Subscription(asJavaScheduler.schedule(()=>action, delayTime.length, delayTime.unit))
160-
}
87+
trait Inner extends Subscription {
88+
private [scala] val asJavaInner: rx.Scheduler.Inner
16189

16290
/**
163-
* Schedules an action to be executed periodically.
164-
*
165-
* @param action
166-
* The action to execute periodically.
167-
* @param initialDelay
168-
* Time to wait before executing the action for the first time.
169-
* @param period
170-
* The time interval to wait each time in between executing the action.
171-
* @return A subscription to be able to unsubscribe from action.
91+
* Schedules a cancelable action to be executed in delayTime.
17292
*/
173-
def schedule(initialDelay: Duration, period: Duration, action: =>Unit): Subscription = {
174-
Subscription(asJavaScheduler.schedulePeriodically(()=>action, initialDelay.length, initialDelay.unit.convert(period.length, period.unit), initialDelay.unit))
175-
}
176-
177-
def scheduleRec(work: (=>Unit)=>Unit): Subscription = {
178-
Subscription(asJavaScheduler.schedule(new Action1[Action0] {
179-
def call(t1: Action0){
180-
work{ t1.call() }
181-
}
182-
}))
183-
}
93+
def schedule(action: Inner => Unit, delayTime: Duration): Unit =
94+
this.asJavaInner.schedule(
95+
new Action1[rx.Scheduler.Inner] {
96+
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
97+
},
98+
delayTime.length,
99+
delayTime.unit)
184100

185101
/**
186-
* Returns the scheduler's notion of current absolute time in milliseconds.
102+
* Schedules a cancelable action to be executed immediately.
187103
*/
188-
def now: Long = {
189-
asJavaScheduler.now
190-
}
104+
def schedule(action: Inner=>Unit): Unit = this.asJavaInner.schedule(
105+
new Action1[rx.Scheduler.Inner]{
106+
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
107+
}
108+
)
191109

192110
/**
193-
* Parallelism available to a Scheduler.
194-
*
195-
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
196-
*
197-
* @return the scheduler's available degree of parallelism.
111+
* @return the scheduler's notion of current absolute time in milliseconds.
198112
*/
199-
def degreeOfParallelism: Int = {
200-
asJavaScheduler.degreeOfParallelism
201-
}
202-
113+
def now: Long = this.asJavaInner.now()
203114
}
204115

116+
205117
private [scala] object Scheduler {
206118
def apply(scheduler: rx.Scheduler): Scheduler = scheduler match {
207-
case s: rx.schedulers.CurrentThreadScheduler => new CurrentThreadScheduler(s)
208-
case s: rx.schedulers.ExecutorScheduler => new ExecutorScheduler(s)
209-
case s: rx.schedulers.ImmediateScheduler => new ImmediateScheduler(s)
210-
case s: rx.schedulers.NewThreadScheduler => new NewThreadScheduler(s)
211119
case s: rx.schedulers.TestScheduler => new TestScheduler(s)
212120
case s: rx.Scheduler => new Scheduler{ val asJavaScheduler = s }
213121
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package rx.lang.scala.schedulers
2+
3+
import rx.lang.scala.Scheduler
4+
5+
6+
object ComputationScheduler {
7+
/**
8+
* {@link Scheduler} intended for computational work.
9+
* <p>
10+
* This can be used for event-loops, processing callbacks and other computational work.
11+
* <p>
12+
* Do not perform IO-bound work on this scheduler. Use {@link IOScheduler()} instead.
13+
*
14+
* @return { @link Scheduler} for computation-bound work.
15+
*/
16+
def apply(): IOScheduler = {
17+
new IOScheduler(rx.schedulers.Schedulers.computation())
18+
}
19+
}
20+
21+
/**
22+
* Created by netflix on 2/5/14.
23+
*/
24+
class ComputationScheduler private[scala] (val asJavaScheduler: rx.Scheduler)
25+
extends Scheduler {}

0 commit comments

Comments
 (0)