Skip to content

Commit 50e3221

Browse files
committed
rxFlowable builder for Rx 2.x
1 parent f526b13 commit 50e3221

File tree

4 files changed

+131
-2
lines changed

4 files changed

+131
-2
lines changed

reactive/kotlinx-coroutines-rx2/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Coroutine builders:
99
| [rxCompletable] | `Completable` | [CoroutineScope] | Cold completable that starts coroutine on subscribe
1010
| [rxSingle] | `Single` | [CoroutineScope] | Cold single that starts coroutine on subscribe
1111
| [rxObservable] | `Observable` | [ProducerScope] | Cold observable that starts coroutine on subscribe
12+
| [rxFlowable] | `Flowable` | [ProducerScope] | Cold observable that starts coroutine on subscribe with **backpressure** support
1213

1314
Suspending extension functions and suspending iteration:
1415

@@ -50,6 +51,7 @@ Conversion functions:
5051
[rxCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-completable.html
5152
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-single.html
5253
[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-observable.html
54+
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/rx-flowable.html
5355
[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-completable-source/await.html
5456
[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-single-source/await.html
5557
[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.experimental.rx2/io.reactivex.-observable-source/await-first.html
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx2
18+
19+
import io.reactivex.Flowable
20+
import kotlinx.coroutines.experimental.channels.ProducerScope
21+
import kotlinx.coroutines.experimental.reactive.publish
22+
import kotlin.coroutines.experimental.CoroutineContext
23+
24+
/**
25+
* Creates cold [flowable][Flowable] that will run a given [block] in a coroutine.
26+
* Every time the returned flowable is subscribed, it starts a new coroutine in the specified [context].
27+
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
28+
*
29+
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
30+
* `onNext` is not invoked concurrently.
31+
*
32+
* | **Coroutine action** | **Signal to subscriber**
33+
* | -------------------------------------------- | ------------------------
34+
* | `send` | `onNext`
35+
* | Normal completion or `close` without cause | `onComplete`
36+
* | Failure with exception or `close` with cause | `onError`
37+
*/
38+
public fun <T> rxFlowable(
39+
context: CoroutineContext,
40+
block: suspend ProducerScope<T>.() -> Unit
41+
): Flowable<T> = Flowable.fromPublisher(publish(context, block))

reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ import kotlin.coroutines.experimental.startCoroutine
3838
* Every time the returned observable is subscribed, it starts a new coroutine in the specified [context].
3939
* Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
4040
*
41-
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
42-
* `onNext` is not invoked concurrently.
41+
* Invocations of `send` are suspended appropriately to ensure that `onNext` is not invoked concurrently.
42+
* Note, that Rx 2.x [Observable] **does not support backpressure**. Use [rxFlowable].
4343
*
4444
* | **Coroutine action** | **Signal to subscriber**
4545
* | -------------------------------------------- | ------------------------
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx2
18+
19+
import kotlinx.coroutines.experimental.TestBase
20+
import kotlinx.coroutines.experimental.runBlocking
21+
import kotlinx.coroutines.experimental.yield
22+
import org.hamcrest.core.IsEqual
23+
import org.hamcrest.core.IsInstanceOf
24+
import org.junit.Assert
25+
import org.junit.Test
26+
27+
class FlowableTest : TestBase() {
28+
@Test
29+
fun testBasicSuccess() = runBlocking<Unit> {
30+
expect(1)
31+
val observable = rxFlowable(context) {
32+
expect(4)
33+
send("OK")
34+
}
35+
expect(2)
36+
observable.subscribe { value ->
37+
expect(5)
38+
Assert.assertThat(value, IsEqual("OK"))
39+
}
40+
expect(3)
41+
yield() // to started coroutine
42+
finish(6)
43+
}
44+
45+
@Test
46+
fun testBasicFailure() = runBlocking<Unit> {
47+
expect(1)
48+
val observable = rxFlowable<String>(context) {
49+
expect(4)
50+
throw RuntimeException("OK")
51+
}
52+
expect(2)
53+
observable.subscribe({
54+
expectUnreached()
55+
}, { error ->
56+
expect(5)
57+
Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
58+
Assert.assertThat(error.message, IsEqual("OK"))
59+
})
60+
expect(3)
61+
yield() // to started coroutine
62+
finish(6)
63+
}
64+
65+
@Test
66+
fun testBasicUnsubscribe() = runBlocking<Unit> {
67+
expect(1)
68+
val observable = rxFlowable<String>(context) {
69+
expect(4)
70+
yield() // back to main, will get cancelled
71+
expectUnreached()
72+
}
73+
expect(2)
74+
val sub = observable.subscribe({
75+
expectUnreached()
76+
}, {
77+
expectUnreached()
78+
})
79+
expect(3)
80+
yield() // to started coroutine
81+
expect(5)
82+
sub.dispose() // will cancel coroutine
83+
yield()
84+
finish(6)
85+
}
86+
}

0 commit comments

Comments
 (0)