Skip to content

Commit 187eddb

Browse files
Merge pull request #1042 from MarioAriasC/master
Kotlin M7 and full compatibility with 0.17.0
2 parents cc17cbd + a26a6d0 commit 187eddb

File tree

5 files changed

+64
-57
lines changed

5 files changed

+64
-57
lines changed

language-adaptors/rxjava-kotlin/README.md

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,37 @@
33
Kotlin has support for SAM (Single Abstract Method) Interfaces as Functions (i.e. Java 8 Lambdas). So you could use Kotlin in RxJava whitout this adaptor
44

55
```kotlin
6-
Observable.create(OnSubscribeFunc<String> {
7-
it!!.onNext("Hello")
8-
it.onCompleted()
6+
Observable.create(OnSubscribeFunc<String> { observer ->
7+
observer!!.onNext("Hello")
8+
observer.onCompleted()
99
Subscriptions.empty()
1010
})!!.subscribe { result ->
1111
a!!.received(result)
1212
}
1313
```
1414

15-
This adaptor exposes a set of Extension functions that allow a more idiomatic Kotlin usage
15+
In RxJava [0.17.0](https://github.com/Netflix/RxJava/releases/tag/0.17.0) version a new Subscriber type was included
1616

1717
```kotlin
18-
import rx.lang.kotlin.*
18+
Observable.create(object:OnSubscribe<String> {
19+
override fun call(subscriber: Subscriber<in String>?) {
20+
subscriber!!.onNext("Hello")
21+
subscriber.onCompleted()
22+
}
23+
})!!.subscribe { result ->
24+
a!!.received(result)
25+
}
26+
```
1927

20-
{(observer: Observer<in String>) ->
21-
observer.onNext("Hello")
22-
observer.onCompleted()
23-
Subscriptions.empty()!!
24-
}.asObservableFunc().subscribe { result ->
28+
(Due to a [bug in Kotlin's compiler](http://youtrack.jetbrains.com/issue/KT-4753) you can't use SAM with OnSubscribe)
29+
30+
This adaptor exposes a set of Extension functions that allow a more idiomatic Kotlin usage
31+
32+
```kotlin
33+
{(subscriber: Subscriber<in String>) ->
34+
subscriber.onNext("Hello")
35+
subscriber.onCompleted()
36+
}.asObservable().subscribe { result ->
2537
a!!.received(result)
2638
}
2739
```

language-adaptors/rxjava-kotlin/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ buildscript {
44
}
55

66
dependencies {
7-
classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.6.1673'
7+
classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.7.270'
88
}
99
}
1010

@@ -13,7 +13,7 @@ apply plugin: 'osgi'
1313

1414
dependencies {
1515
compile project(':rxjava-core')
16-
compile 'org.jetbrains.kotlin:kotlin-stdlib:0.6.1673'
16+
compile 'org.jetbrains.kotlin:kotlin-stdlib:0.7.270'
1717
provided 'junit:junit-dep:4.10'
1818
provided 'org.mockito:mockito-core:1.8.5'
1919
}

language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,19 @@ import rx.Observable
2121
import rx.Observable.OnSubscribe
2222
import rx.Subscription
2323
import rx.Observable.OnSubscribeFunc
24+
import rx.Subscriber
2425

2526

26-
public fun<T> Function1<Observer<in T>, Unit>.asObservable(): Observable<T> {
27-
return Observable.create(OnSubscribe<T>{ t1 ->
28-
this(t1!!)
27+
public fun<T> Function1<Subscriber<in T>, Unit>.asObservable(): Observable<T> {
28+
return Observable.create(object:OnSubscribe<T> {
29+
override fun call(t1: Subscriber<in T>?) {
30+
this@asObservable(t1!!)
31+
}
32+
2933
})!!
3034
}
3135

36+
[deprecated("Use Function1<Subscriber<in T>, Unit>.asObservable()")]
3237
public fun<T> Function1<Observer<in T>, Subscription>.asObservableFunc(): Observable<T> {
3338
return Observable.create(OnSubscribeFunc<T>{ op ->
3439
this(op!!)

language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616

1717
package rx.lang.kotlin
1818

19-
import org.mockito.Mock
2019
import rx.Observable
21-
import org.junit.Before
22-
import org.mockito.MockitoAnnotations
2320
import org.junit.Test
2421
import rx.subscriptions.Subscriptions
2522
import org.mockito.Mockito.*
@@ -31,21 +28,23 @@ import rx.Subscription
3128
import kotlin.concurrent.thread
3229
import rx.Observable.OnSubscribeFunc
3330
import rx.lang.kotlin.BasicKotlinTests.AsyncObservable
31+
import rx.Observable.OnSubscribe
32+
import rx.Subscriber
3433

3534
/**
3635
* This class use plain Kotlin without extensions from the language adaptor
3736
*/
38-
public class BasicKotlinTests:KotlinTests() {
39-
37+
public class BasicKotlinTests : KotlinTests() {
4038

4139

4240
[Test]
4341
public fun testCreate() {
4442

45-
Observable.create(OnSubscribeFunc<String> {
46-
it!!.onNext("Hello")
47-
it.onCompleted()
48-
Subscriptions.empty()
43+
Observable.create(object:OnSubscribe<String> {
44+
override fun call(subscriber: Subscriber<in String>?) {
45+
subscriber!!.onNext("Hello")
46+
subscriber.onCompleted()
47+
}
4948
})!!.subscribe { result ->
5049
a!!.received(result)
5150
}
@@ -310,7 +309,7 @@ public class BasicKotlinTests:KotlinTests() {
310309

311310

312311

313-
public class TestFactory(){
312+
public class TestFactory() {
314313
var counter = 1
315314

316315
val numbers: Observable<Int>
@@ -330,24 +329,23 @@ public class BasicKotlinTests:KotlinTests() {
330329

331330
}
332331

333-
class AsyncObservable : OnSubscribeFunc<Int>{
334-
override fun onSubscribe(op: Observer<in Int>?): Subscription? {
332+
class AsyncObservable : OnSubscribe<Int> {
333+
override fun call(op: Subscriber<in Int>?) {
335334
thread {
336335
Thread.sleep(50)
337336
op!!.onNext(1)
338337
op.onNext(2)
339338
op.onNext(3)
340339
op.onCompleted()
341340
}
342-
return Subscriptions.empty()
341+
343342
}
344343
}
345344

346-
class TestOnSubscribe(val count: Int) : OnSubscribeFunc<String>{
347-
override fun onSubscribe(op: Observer<in String>?): Subscription? {
345+
class TestOnSubscribe(val count: Int) : OnSubscribe<String> {
346+
override fun call(op: Subscriber<in String>?) {
348347
op!!.onNext("hello_$count")
349348
op.onCompleted()
350-
return Subscriptions.empty()!!
351349
}
352350

353351
}

language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,14 @@
1616

1717
package rx.lang.kotlin
1818

19-
import org.mockito.Mock
2019
import rx.Observable
21-
import org.junit.Before
22-
import org.mockito.MockitoAnnotations
2320
import org.junit.Test
24-
import rx.subscriptions.Subscriptions
2521
import org.mockito.Mockito.*
2622
import org.mockito.Matchers.*
27-
import rx.Observer
2823
import org.junit.Assert.*
2924
import rx.Notification
30-
import rx.Subscription
3125
import kotlin.concurrent.thread
26+
import rx.Subscriber
3227

3328
/**
3429
* This class contains tests using the extension functions provided by the language adaptor.
@@ -39,11 +34,10 @@ public class ExtensionTests : KotlinTests() {
3934
[Test]
4035
public fun testCreate() {
4136

42-
{(observer: Observer<in String>) ->
43-
observer.onNext("Hello")
44-
observer.onCompleted()
45-
Subscriptions.empty()!!
46-
}.asObservableFunc().subscribe { result ->
37+
{(subscriber: Subscriber<in String>) ->
38+
subscriber.onNext("Hello")
39+
subscriber.onCompleted()
40+
}.asObservable().subscribe { result ->
4741
a!!.received(result)
4842
}
4943

@@ -216,15 +210,15 @@ public class ExtensionTests : KotlinTests() {
216210

217211
[Test]
218212
public fun testForEach() {
219-
asyncObservable.asObservableFunc().toBlockingObservable()!!.forEach(received())
213+
asyncObservable.asObservable().toBlockingObservable()!!.forEach(received())
220214
verify(a, times(1))!!.received(1)
221215
verify(a, times(1))!!.received(2)
222216
verify(a, times(1))!!.received(3)
223217
}
224218

225219
[Test(expected = javaClass<RuntimeException>())]
226220
public fun testForEachWithError() {
227-
asyncObservable.asObservableFunc().toBlockingObservable()!!.forEach { throw RuntimeException("err") }
221+
asyncObservable.asObservable().toBlockingObservable()!!.forEach { throw RuntimeException("err") }
228222
fail("we expect an exception to be thrown")
229223
}
230224

@@ -259,21 +253,19 @@ public class ExtensionTests : KotlinTests() {
259253
assertEquals(listOf(3, 6, 9), values[2])
260254
}
261255

262-
val funOnSubscribe: (Int, Observer<in String>) -> Subscription = { counter, observer ->
263-
observer.onNext("hello_$counter")
264-
observer.onCompleted()
265-
Subscriptions.empty()!!
256+
val funOnSubscribe: (Int, Subscriber<in String>) -> Unit = { counter, subscriber ->
257+
subscriber.onNext("hello_$counter")
258+
subscriber.onCompleted()
266259
}
267260

268-
val asyncObservable: (Observer<in Int>) -> Subscription = { observer ->
261+
val asyncObservable: (Subscriber<in Int>) -> Unit = { subscriber ->
269262
thread {
270263
Thread.sleep(50)
271-
observer.onNext(1)
272-
observer.onNext(2)
273-
observer.onNext(3)
274-
observer.onCompleted()
264+
subscriber.onNext(1)
265+
subscriber.onNext(2)
266+
subscriber.onNext(3)
267+
subscriber.onCompleted()
275268
}
276-
Subscriptions.empty()!!
277269
}
278270

279271
/**
@@ -283,22 +275,22 @@ public class ExtensionTests : KotlinTests() {
283275
return {(p2: P2) -> this(p1, p2) }
284276
}
285277

286-
inner public class TestFactory(){
278+
inner public class TestFactory() {
287279
var counter = 1
288280

289281
val numbers: Observable<Int>
290282
get(){
291283
return listOf(1, 3, 2, 5, 4).asObservable()
292284
}
293285

294-
val onSubscribe: (Observer<in String>) -> Subscription
286+
val onSubscribe: (Subscriber<in String>) -> Unit
295287
get(){
296288
return funOnSubscribe.partially1(counter++)
297289
}
298290

299291
val observable: Observable<String>
300292
get(){
301-
return onSubscribe.asObservableFunc()
293+
return onSubscribe.asObservable()
302294
}
303295

304296
}

0 commit comments

Comments
 (0)