Skip to content

Commit f7ab0b3

Browse files
scaladoc for Observer, Subject, Scheduler, and
new Notification class with unapply (scalac crashes)
1 parent 14369fc commit f7ab0b3

File tree

7 files changed

+284
-34
lines changed

7 files changed

+284
-34
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.junit.Assert._
2424
import rx.lang.scala.concurrency.NewThreadScheduler
2525
import rx.lang.scala.util.Timestamped
2626

27-
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
27+
//@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
2828
class RxScalaDemo extends JUnitSuite {
2929

3030
@Test def intervalExample() {
@@ -383,6 +383,17 @@ class RxScalaDemo extends JUnitSuite {
383383
}
384384
}
385385

386+
@Test def materializeExample() {
387+
def printObservable[T](o: Observable[T]): Unit = {
388+
for (n <- o.materialize.toBlockingObservable) n match {
389+
case Notification.OnNext[T](v) => println("Got value " + v)
390+
case Notification.OnCompleted[T]() => println("Completed")
391+
case Notification.OnError[T](err) => println("Error: ")
392+
}
393+
}
394+
val mat = Observable.interval(100 millis).take(3).materialize
395+
}
396+
386397
def output(s: String): Unit = println(s)
387398

388399
// blocks until obs has completed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ object ImplicitFunctionConversions {
2929

3030
implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
3131
new rx.Observable.OnSubscribeFunc[T] {
32-
def onSubscribe(obs: Observer[_ >: T]): rx.Subscription = {
33-
f(obs).asJava
32+
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
33+
f(obs)
3434
}
3535
}
3636

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package rx.lang.scala
2+
3+
sealed trait Notification[+T] {
4+
def asJava: rx.Notification[_ <: T]
5+
}
6+
7+
object Notification {
8+
9+
def apply[T](n: rx.Notification[_ <: T]): Notification[T] = n.getKind match {
10+
case rx.Notification.Kind.OnNext => new OnNext(n)
11+
case rx.Notification.Kind.OnCompleted => new OnCompleted(n)
12+
case rx.Notification.Kind.OnError => new OnError(n)
13+
}
14+
15+
// OnNext, OnError, OnCompleted are not case classes because we don't want pattern matching
16+
// to extract the rx.Notification
17+
18+
class OnNext[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
19+
def value: T = asJava.getValue
20+
def unapply[U](n: Notification[U]): Option[U] = n match {
21+
case n2: OnNext[U] => Some(n.asJava.getValue)
22+
case _ => None
23+
}
24+
}
25+
26+
class OnError[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
27+
def error: Throwable = asJava.getThrowable()
28+
def unapply[U](n: Notification[U]): Option[Throwable] = n match {
29+
case n2: OnError[U] => Some(n2.asJava.getThrowable)
30+
case _ => None
31+
}
32+
}
33+
34+
class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
35+
def unapply[U](n: Notification[U]): Option[Unit] = n match {
36+
case n2: OnCompleted[U] => Some()
37+
case _ => None
38+
}
39+
}
40+
41+
}
42+

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
3030
import scala.concurrent.duration.{Duration, TimeUnit}
3131
import rx.{Observable => JObservable}
3232
import rx.util.functions._
33-
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
3433
import rx.lang.scala.util._
3534
import rx.lang.scala.subjects.Subject
3635
import rx.lang.scala.observables.BlockingObservable
@@ -714,7 +713,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
714713
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229453(v=VS.103).aspx">MSDN: Observable.materialize</a>
715714
*/
716715
def materialize: Observable[Notification[T]] = {
717-
Observable[Notification[T]](asJava.materialize())
716+
Observable[rx.Notification[_ <: T]](asJava.materialize()).map(Notification(_))
718717
}
719718

720719
/**
@@ -755,9 +754,11 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
755754
* @return an Observable that emits the items and notifications embedded in the {@link Notification} objects emitted by the source Observable
756755
*/
757756
// with =:= it does not work, why?
758-
def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U] = {
759-
val o = asJava.dematerialize[U]()
760-
Observable[U](o)
757+
def dematerialize[U](implicit evidence: Observable[T] <:< Observable[Notification[U]]): Observable[U] = {
758+
val o1: Observable[Notification[U]] = this
759+
val o2: Observable[rx.Notification[_ <: U]] = o1.map(_.asJava)
760+
val o3 = o2.asJava.dematerialize[U]()
761+
Observable[U](o3)
761762
}
762763

763764
/**
@@ -1765,7 +1766,6 @@ object Observable {
17651766
import scala.collection.immutable.Range
17661767
import scala.concurrent.duration.Duration
17671768
import rx.{Observable => JObservable}
1768-
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
17691769
import rx.lang.scala.util._
17701770
import rx.util.functions._
17711771
import rx.lang.scala.ImplicitFunctionConversions._
@@ -1923,7 +1923,8 @@ object Observable {
19231923
}
19241924

19251925
def apply[T](f: Future[T], scheduler: Scheduler): Observable[T] = {
1926-
Observable[T](rx.Observable.from(f, scheduler))
1926+
val sched: rx.Scheduler = scheduler
1927+
Observable[T](rx.Observable.from(f, sched))
19271928
}
19281929

19291930
def apply[T](f: Future[T], duration: Duration): Observable[T] = {
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package rx.lang.scala
2+
3+
/**
4+
* Contains special Observables.
5+
*
6+
* In Scala, this package only contains [[rx.lang.scala.observables.BlockingObservable]].
7+
* In the corresponding Java package {{{rx.observables}}}, there is also a
8+
* {{{GroupedObservable}}} and a {{{ConnectableObservable}}}, but these are not needed
9+
* in Scala, because we use a pair {{{(key, observable)}}} instead of {{{GroupedObservable}}}
10+
* and a pair {{{(startFunction, observable)}}} instead of {{{ConnectableObservable}}}.
11+
*/
12+
package object observables {}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala

Lines changed: 192 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,50 +16,221 @@
1616
package rx.lang
1717

1818

19-
/*
19+
/**
2020
* This object contains aliases to all types Scala users need to import.
21+
*
2122
* Note that:
2223
* - Scala users cannot use Java's type with variance without always using writing
2324
* e.g. rx.Notification[_ <: T], so we create aliases fixing the variance
2425
* - For consistency, we create aliases for all types
25-
* - Type aliases cannot be at top level, they have to be inside an object or class
2626
*/
27+
import java.util.concurrent.TimeUnit
28+
import java.util.Date
2729
package object scala {
2830

29-
type Notification[+T] = rx.Notification[_ <: T]
30-
object Notification {
31-
def apply[T](): Notification[T] = new rx.Notification()
32-
def apply[T](value: T): Notification[T] = new rx.Notification(value)
33-
def apply[T](t: Throwable): Notification[T] = new rx.Notification(t)
31+
/*
32+
* Here we're imitating C's preprocessor using Search & Replace.
33+
*
34+
* To activate the code needed to get nice Scaladoc, do the following replacements:
35+
* /*//#ifdef SCALADOC --> //#ifdef SCALADOC
36+
* *///#else --> /*//#else
37+
* //#endif --> *///#endif
38+
*
39+
* To get back to the actual code, undo the above replacements.
40+
*
41+
*/
42+
43+
/*//#ifdef SCALADOC
44+
45+
/**
46+
* Provides a mechanism for receiving push-based notifications.
47+
*
48+
* After an Observer calls an [[rx.lang.scala.Observable]]'s {{{subscribe}}} method, the Observable
49+
* calls the Observer's {{{onNext}}} method to provide notifications. A well-behaved Observable will
50+
* call an Observer's {{{onCompleted}}} method exactly once or the Observer's {{{onError}}} method exactly once.
51+
*/
52+
trait Observer[-T] {
53+
54+
/**
55+
* Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications.
56+
*
57+
* The [[rx.lang.scala.Observable]] will not call this method if it calls {{{onError}}}.
58+
*/
59+
def onCompleted(): Unit
60+
61+
/**
62+
* Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition.
63+
*
64+
* If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call {{{onNext}}} or {{{onCompleted}}}.
65+
*/
66+
def onError(e: Throwable): Unit
67+
68+
/**
69+
* Provides the Observer with new data.
70+
*
71+
* The [[rx.lang.scala.Observable]] calls this closure 0 or more times.
72+
*
73+
* The [[rx.lang.scala.Observable]] will not call this method again after it calls either {{{onCompleted}}} or {{{onError}}}.
74+
*/
75+
def onNext(arg: T): Unit
3476
}
35-
36-
type Observer[-T] = rx.Observer[_ >: T]
37-
type Scheduler = rx.Scheduler
77+
3878
79+
/**
80+
* Represents an object that schedules units of work.
81+
*/
82+
abstract class Scheduler {
83+
84+
/**
85+
* Schedules a cancelable action to be executed.
86+
*
87+
* @param state
88+
* State to pass into the action.
89+
* @param action
90+
* Action to schedule.
91+
* @return a subscription to be able to unsubscribe from action.
92+
*/
93+
def schedule[T](state: T, action: (Scheduler, T) => Subscription): Subscription
94+
95+
/**
96+
* Schedules a cancelable action to be executed in delayTime.
97+
*
98+
* @param state
99+
* State to pass into the action.
100+
* @param action
101+
* Action to schedule.
102+
* @param delayTime
103+
* Time the action is to be delayed before executing.
104+
* @param unit
105+
* Time unit of the delay time.
106+
* @return a subscription to be able to unsubscribe from action.
107+
*/
108+
def schedule[T](state: T, action: (Scheduler, T) => Subscription, delayTime: Long, unit: TimeUnit): Subscription
109+
110+
/**
111+
* Schedules a cancelable action to be executed periodically.
112+
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
113+
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
114+
*
115+
* @param state
116+
* State to pass into the action.
117+
* @param action
118+
* The action to execute periodically.
119+
* @param initialDelay
120+
* Time to wait before executing the action for the first time.
121+
* @param period
122+
* The time interval to wait each time in between executing the action.
123+
* @param unit
124+
* The time unit the interval above is given in.
125+
* @return A subscription to be able to unsubscribe from action.
126+
*/
127+
def schedulePeriodically[T](state: T, action: (Scheduler, T) => Subscription, initialDelay: Long, period: Long, unit: TimeUnit): Subscription
128+
129+
/**
130+
* Schedules a cancelable action to be executed at dueTime.
131+
*
132+
* @param state
133+
* State to pass into the action.
134+
* @param action
135+
* Action to schedule.
136+
* @param dueTime
137+
* Time the action is to be executed. If in the past it will be executed immediately.
138+
* @return a subscription to be able to unsubscribe from action.
139+
*/
140+
def schedule[T](state: T, action: (Scheduler, T) => Subscription, dueTime: Date): Subscription
141+
142+
/**
143+
* Schedules an action to be executed.
144+
*
145+
* @param action
146+
* action
147+
* @return a subscription to be able to unsubscribe from action.
148+
*/
149+
def schedule(action: () => Unit): Subscription
150+
151+
/**
152+
* Schedules an action to be executed in delayTime.
153+
*
154+
* @param action
155+
* action
156+
* @return a subscription to be able to unsubscribe from action.
157+
*/
158+
def schedule(action: () => Unit, delayTime: Long, unit: TimeUnit): Subscription
159+
160+
/**
161+
* Schedules an action to be executed periodically.
162+
*
163+
* @param action
164+
* The action to execute periodically.
165+
* @param initialDelay
166+
* Time to wait before executing the action for the first time.
167+
* @param period
168+
* The time interval to wait each time in between executing the action.
169+
* @param unit
170+
* The time unit the interval above is given in.
171+
* @return A subscription to be able to unsubscribe from action.
172+
*/
173+
def schedulePeriodically(action: () => Unit, initialDelay: Long, period: Long, unit: TimeUnit): Subscription
174+
175+
/**
176+
* @return the scheduler's notion of current absolute time in milliseconds.
177+
*/
178+
def now(): Long
179+
180+
/**
181+
* Parallelism available to a Scheduler.
182+
* <p>
183+
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
184+
*
185+
* @return the scheduler's available degree of parallelism.
186+
*/
187+
def degreeOfParallelism: Int
188+
189+
}
190+
39191
/**
40192
* Subscriptions are returned from all Observable.subscribe methods to allow unsubscribing.
41193
*
42-
* This interface is the RxJava equivalent of IDisposable in Microsoft's Rx implementation.
194+
* This interface is the equivalent of IDisposable in the .NET Rx implementation.
43195
*/
44-
implicit class Subscription(val asJava: rx.Subscription) extends AnyVal {
196+
trait Subscription {
45197
/**
46-
* Call this to stop receiving notifications on the Observer that was registered when
198+
* Call this method to stop receiving notifications on the Observer that was registered when
47199
* this Subscription was received.
48200
*/
49-
def unsubscribe(): Unit = asJava.unsubscribe()
201+
def unsubscribe(): Unit
50202
}
203+
204+
private[scala] implicit def fakeSubscription2RxSubscription(s: Subscription): rx.Subscription =
205+
new rx.Subscription {
206+
def unsubscribe() = s.unsubscribe()
207+
}
208+
private[scala] implicit def rxSubscription2FakeSubscription(s: rx.Subscription): Subscription =
209+
new Subscription {
210+
def unsubscribe() = s.unsubscribe()
211+
}
212+
213+
private[scala] implicit def fakeObserver2RxObserver[T](o: Observer[T]): rx.Observer[_ >: T] = ???
214+
private[scala] implicit def rxObserver2fakeObserver[T](o: rx.Observer[_ >: T]): Observer[T] = ???
215+
216+
private[scala] implicit def fakeScheduler2RxScheduler(s: Scheduler): rx.Scheduler = ???
217+
private[scala] implicit def rxScheduler2fakeScheduler(s: rx.Scheduler): Scheduler = ???
218+
219+
*///#else
220+
221+
type Observer[-T] = rx.Observer[_ >: T]
222+
223+
type Scheduler = rx.Scheduler
224+
225+
type Subscription = rx.Subscription
226+
227+
//#endif
51228

52229
}
53230

54231
/*
55232
56-
TODO make aliases for these types because:
57-
* those which are covariant or contravariant do need an alias to get variance correct
58-
* the others for consistency
59-
60-
rx.observables.BlockingObservable
61-
rx.observables.ConnectableObservable
62-
rx.observables.GroupedObservable
233+
These classes are considered unnecessary for Scala users, so we don't create aliases for them:
63234
64235
rx.plugins.RxJavaErrorHandler
65236
rx.plugins.RxJavaObservableExecutionHook
@@ -70,4 +241,3 @@ rx.subscriptions.CompositeSubscription
70241
rx.subscriptions.Subscriptions
71242
72243
*/
73-

0 commit comments

Comments
 (0)