@@ -419,6 +419,7 @@ You can subscribe to subjects from a coroutine just as with any other reactive s
419
419
420
420
<!-- - INCLUDE
421
421
import io.reactivex.subjects.BehaviorSubject
422
+ import kotlinx.coroutines.experimental.Unconfined
422
423
import kotlinx.coroutines.experimental.launch
423
424
import kotlinx.coroutines.experimental.runBlocking
424
425
import kotlinx.coroutines.experimental.rx2.consumeEach
@@ -430,7 +431,7 @@ fun main(args: Array<String>) = runBlocking<Unit> {
430
431
subject.onNext(" one" )
431
432
subject.onNext(" two" )
432
433
// now launch a coroutine to print everything
433
- launch(context ) { // use the context of the main thread for a coroutine
434
+ launch(Unconfined ) { // launch coroutine in unconfined context
434
435
subject.consumeEach { println (it) }
435
436
}
436
437
subject.onNext(" three" )
@@ -440,20 +441,26 @@ fun main(args: Array<String>) = runBlocking<Unit> {
440
441
441
442
> You can get full code [ here] ( kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-07.kt )
442
443
443
- The result is different, though :
444
+ The result the same :
444
445
445
446
``` text
447
+ two
448
+ three
446
449
four
447
450
```
448
451
449
452
<!-- - TEST -->
450
453
451
- It prints only the value value, because the coroutine is working in the main thread, which is busy updating the
452
- subject value. Only when the main thread completes, the subscribing coroutine has a change to print anything. By that
453
- time, the subject had already updated its value to "four" .
454
+ Here we use [ Unconfined ] coroutine context to launch consuming coroutine with the same behaviour as subscription in Rx.
455
+ It basically means that the launched coroutine is going to be immediately executed in the same thread that
456
+ is emitting elements. Contexts are covered in more details in a [ separate section ] ( #coroutine-context ) .
454
457
455
- The coroutines in the main thread are scheduled cooperatively. There is a [ yield] function to explicitly relinquish
456
- the control of the thread to other coroutines. We can add it to the last example:
458
+ The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates.
459
+ A typical UI application does not need to react to every state change. Only the most recent state is relevant.
460
+ A sequence of back-to-back updates to the application state needs to get reflected in UI only once,
461
+ as soon as the UI thread is free. For the following example we are going to simulate this by launching
462
+ consuming coroutine in the context of the main thread and use [ yield] function to simulate a break in the
463
+ sequence of updates and to release the main thread:
457
464
458
465
<!-- - INCLUDE
459
466
import io.reactivex.subjects.BehaviorSubject
@@ -468,33 +475,29 @@ fun main(args: Array<String>) = runBlocking<Unit> {
468
475
val subject = BehaviorSubject .create<String >()
469
476
subject.onNext(" one" )
470
477
subject.onNext(" two" )
471
- // now launch a coroutine to print everything
478
+ // now launch a coroutine to print the most recent update
472
479
launch(context) { // use the context of the main thread for a coroutine
473
480
subject.consumeEach { println (it) }
474
481
}
475
482
subject.onNext(" three" )
476
- yield () // yield the main thread to the launched coroutine <--- HERE
477
483
subject.onNext(" four" )
484
+ yield () // yield the main thread to the launched coroutine <--- HERE
478
485
}
479
486
```
480
487
481
488
> You can get full code [ here] ( kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-08.kt )
482
489
483
- Now coroutine has a chance to process (print) the "three" state of the subject, too :
490
+ Now coroutine process (prints) only the most recent update :
484
491
485
492
``` text
486
- three
487
493
four
488
494
```
489
495
490
496
<!-- - TEST -->
491
497
492
- This is quite the desired behavior for any kind of state-holding variable that needs to processed to update UI or
493
- other linked state, for example. There is no reason to react to back-to-back updates of the state.
494
- Only the most recent state is relevant.
495
-
496
- The corresponding behavior in coroutines world is implemented by [ ConflatedBroadcastChannel] that provides the same logic
497
- on top of coroutine channels directly, without going through the bridge to the reactive streams:
498
+ The corresponding behavior in a pure coroutines world is implemented by [ ConflatedBroadcastChannel]
499
+ that provides the same logic on top of coroutine channels directly,
500
+ without going through the bridge to the reactive streams:
498
501
499
502
<!-- - INCLUDE
500
503
import kotlinx.coroutines.experimental.channels.ConflatedBroadcastChannel
@@ -509,33 +512,32 @@ fun main(args: Array<String>) = runBlocking<Unit> {
509
512
val broadcast = ConflatedBroadcastChannel <String >()
510
513
broadcast.offer(" one" )
511
514
broadcast.offer(" two" )
512
- // now launch a coroutine to print everything
515
+ // now launch a coroutine to print the most recent update
513
516
launch(context) { // use the context of the main thread for a coroutine
514
517
broadcast.consumeEach { println (it) }
515
518
}
516
519
broadcast.offer(" three" )
517
- yield () // yield the main thread to the launched coroutine
518
520
broadcast.offer(" four" )
521
+ yield () // yield the main thread to the launched coroutine
519
522
}
520
523
```
521
524
522
525
> You can get full code [ here] ( kotlinx-coroutines-rx2/src/test/kotlin/guide/example-reactive-basic-09.kt )
523
526
524
- It produces the same output as the version based on ` BehaviorSubject ` :
527
+ It produces the same output as the previous example based on ` BehaviorSubject ` :
525
528
526
529
``` text
527
- three
528
530
four
529
531
```
530
532
533
+ <!-- - TEST -->
534
+
531
535
Another implementation of [ BroadcastChannel] is [ ArrayBroadcastChannel] . It delivers every event to every
532
536
subscriber since the moment the corresponding subscription is open. It corresponds to
533
537
[ PublishSubject] [ http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html ] in Rx.
534
538
The capacity of the buffer in the constructor of ` ArrayBroadcastChannel ` controls the numbers of elements
535
539
that can be sent before the sender is suspended waiting for receiver to receive those elements.
536
540
537
- <!-- - TEST -->
538
-
539
541
## Operators
540
542
541
543
Full-featured reactive stream libraries, like Rx, come with
@@ -1046,11 +1048,11 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
1046
1048
<!-- - DOCS_ROOT kotlinx-coroutines-core/target/dokka/kotlinx-coroutines-core -->
1047
1049
<!-- - INDEX kotlinx.coroutines.experimental -->
1048
1050
[ runBlocking ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/run-blocking.html
1051
+ [ Unconfined ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
1049
1052
[ yield ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/yield.html
1050
1053
[ launch ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/launch.html
1051
1054
[ CoroutineScope.context ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/context.html
1052
1055
[ CommonPool ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-common-pool/index.html
1053
- [ Unconfined ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-unconfined/index.html
1054
1056
[ Job.join ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html
1055
1057
<!-- - INDEX kotlinx.coroutines.experimental.channels -->
1056
1058
[ Channel ] : https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html
0 commit comments