Skip to content

Commit 6361f3e

Browse files
committed
Implement 'create' in RxScala
1 parent aa374e5 commit 6361f3e

File tree

2 files changed

+22
-5
lines changed

2 files changed

+22
-5
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4018,13 +4018,15 @@ object Observable {
40184018
* @return
40194019
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
40204020
*/
4021-
@deprecated("Use `apply[T](Subscriber[T] => Unit)` instead", "0.17.0")
40224021
def create[T](func: Observer[T] => Subscription): Observable[T] = {
4023-
toScalaObservable[T](rx.Observable.create(new OnSubscribeFunc[T] {
4024-
def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = {
4025-
func(Observer(t1))
4022+
Observable(
4023+
(subscriber: Subscriber[T]) => {
4024+
val s = func(subscriber)
4025+
if (s != null && s != subscriber) {
4026+
subscriber.add(s)
4027+
}
40264028
}
4027-
}))
4029+
)
40284030
}
40294031

40304032
/*

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,4 +271,19 @@ class ObservableTests extends JUnitSuite {
271271
assertEquals(expected, r)
272272
}
273273

274+
@Test
275+
def testCreate() {
276+
var called = false
277+
val o = Observable.create[String](observer => {
278+
observer.onNext("a")
279+
observer.onNext("b")
280+
observer.onNext("c")
281+
observer.onCompleted()
282+
Subscription {
283+
called = true
284+
}
285+
})
286+
assertEquals(List("a", "b", "c"), o.toBlocking.toList)
287+
assertTrue(called)
288+
}
274289
}

0 commit comments

Comments
 (0)