Skip to content

Commit c0db4d3

Browse files
rocketramanelizarov
authored andcommitted
Clarify RX subscription and cancellation example
Clarify that `cancel` is called automatically rather than needing to be called manually. Demonstrate stream "completion".
1 parent f4e14b9 commit c0db4d3

File tree

4 files changed

+17
-12
lines changed

4 files changed

+17
-12
lines changed

reactive/coroutines-guide-reactive.md

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,15 @@ import kotlinx.coroutines.experimental.reactive.*
245245
fun main(args: Array<String>) = runBlocking<Unit> {
246246
val source = Flowable.range(1, 5) // a range of five numbers
247247
.doOnSubscribe { println("OnSubscribe") } // provide some insight
248+
.doOnComplete { println("OnComplete") } // ...
248249
.doFinally { println("Finally") } // ... into what's going on
249250
var cnt = 0
250251
source.openSubscription().consume { // open channel to the source
251252
for (x in this) { // iterate over the channel to receive elements from it
252253
println(x)
253254
if (++cnt >= 3) break // break when 3 elements are printed
254255
}
255-
// `use` will close the channel when this block of code is complete
256+
// Note: `consume` cancels the channel when this block of code is complete
256257
}
257258
}
258259
```
@@ -271,16 +272,16 @@ Finally
271272

272273
<!--- TEST -->
273274

274-
With an explicit `openSubscription` we should [close][SubscriptionReceiveChannel.close] the corresponding
275-
subscription to unsubscribe from the source. However, instead of invoking `close` explicitly,
276-
this code relies on [use](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.io/use.html)
277-
function from Kotlin's standard library.
275+
With an explicit `openSubscription` we should [cancel][SubscriptionReceiveChannel.cancel] the corresponding
276+
subscription to unsubscribe from the source. There is no need to invoke `cancel` explicitly -- under the hood
277+
`consume` does that for us.
278278
The installed
279279
[doFinally](http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action))
280-
listener prints "Finally" to confirm that the subscription is actually being closed.
281-
282-
We do not need to use an explicit `close` if iteration is performed over all the items that are emitted
283-
by the publisher, because it is being closed automatically by `consumeEach`:
280+
listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete"
281+
is never printed because we did not consume all of the elements.
282+
283+
We do not need to use an explicit `cancel` either if iteration is performed over all the items that are emitted
284+
by the publisher, because it is being cancelled automatically by `consumeEach`:
284285

285286
<!--- INCLUDE
286287
import io.reactivex.*
@@ -293,6 +294,7 @@ import kotlin.coroutines.experimental.*
293294
fun main(args: Array<String>) = runBlocking<Unit> {
294295
val source = Flowable.range(1, 5) // a range of five numbers
295296
.doOnSubscribe { println("OnSubscribe") } // provide some insight
297+
.doOnComplete { println("OnComplete") } // ...
296298
.doFinally { println("Finally") } // ... into what's going on
297299
// iterate over the source fully
298300
source.consumeEach { println(it) }
@@ -309,13 +311,14 @@ OnSubscribe
309311
2
310312
3
311313
4
314+
OnComplete
312315
Finally
313316
5
314317
```
315318

316319
<!--- TEST -->
317320

318-
Notice, how "Finally" is printed before the last element "5". It happens because our `main` function in this
321+
Notice, how "OnComplete" and "Finally" are printed before the last element "5". It happens because our `main` function in this
319322
example is a coroutine that we start with [runBlocking] coroutine builder.
320323
Our main coroutine receives on the channel using `source.consumeEach { ... }` expression.
321324
The main coroutine is _suspended_ while it waits for the source to emit an item.
@@ -1075,7 +1078,6 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
10751078
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
10761079
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
10771080
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html
1078-
[SubscriptionReceiveChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-subscription-receive-channel/close.html
10791081
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
10801082
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-broadcast-channel/index.html
10811083
[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-conflated-broadcast-channel/index.html

reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-03.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ import kotlinx.coroutines.experimental.reactive.*
2525
fun main(args: Array<String>) = runBlocking<Unit> {
2626
val source = Flowable.range(1, 5) // a range of five numbers
2727
.doOnSubscribe { println("OnSubscribe") } // provide some insight
28+
.doOnComplete { println("OnComplete") } // ...
2829
.doFinally { println("Finally") } // ... into what's going on
2930
var cnt = 0
3031
source.openSubscription().consume { // open channel to the source
3132
for (x in this) { // iterate over the channel to receive elements from it
3233
println(x)
3334
if (++cnt >= 3) break // break when 3 elements are printed
3435
}
35-
// `use` will close the channel when this block of code is complete
36+
// Note: `consume` cancels the channel when this block of code is complete
3637
}
3738
}

reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-04.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import kotlin.coroutines.experimental.*
2525
fun main(args: Array<String>) = runBlocking<Unit> {
2626
val source = Flowable.range(1, 5) // a range of five numbers
2727
.doOnSubscribe { println("OnSubscribe") } // provide some insight
28+
.doOnComplete { println("OnComplete") } // ...
2829
.doFinally { println("Finally") } // ... into what's going on
2930
// iterate over the source fully
3031
source.consumeEach { println(it) }

reactive/kotlinx-coroutines-rx2/src/test/kotlin/guide/test/GuideReactiveTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class GuideReactiveTest : ReactiveTestBase() {
5252
"2",
5353
"3",
5454
"4",
55+
"OnComplete",
5556
"Finally",
5657
"5"
5758
)

0 commit comments

Comments
 (0)