@@ -321,15 +321,15 @@ _suspend_ and they provide a natural answer to handling backpressure.
321
321
In Rx Java 2.x a backpressure-capable class is called
322
322
[ Flowable] ( http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html ) .
323
323
In the following example we use [ rxFlowable] coroutine builder from ` kotlinx-coroutines-rx2 ` module to define a
324
- flowable that sends five integers from 1 to 5 .
324
+ flowable that sends three integers from 1 to 3 .
325
325
It prints a message to the output before invocation of
326
326
suspending [ send] [ SendChannel.send ] function, so that we can study how it operates.
327
327
328
328
The integers are generated in the context of the main thread, but subscription is shifted
329
329
to another thread using Rx
330
330
[ observeOn] ( http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler,%20boolean,%20int) )
331
331
operator with a buffer of size 1.
332
- The subscriber is slow. It takes 200 ms to process each item, which is simulated using ` Thread.sleep ` .
332
+ The subscriber is slow. It takes 500 ms to process each item, which is simulated using ` Thread.sleep ` .
333
333
334
334
<!-- - INCLUDE
335
335
import kotlinx.coroutines.experimental.*
@@ -341,20 +341,20 @@ import io.reactivex.schedulers.Schedulers
341
341
fun main (args : Array <String >) = runBlocking<Unit > {
342
342
// coroutine -- fast producer of elements in the context of the main thread
343
343
val source = rxFlowable(context) {
344
- for (x in 1 .. 5 ) {
345
- println (" Sending $x ..." )
344
+ for (x in 1 .. 3 ) {
346
345
send(x) // this is a suspending function
346
+ println (" Sent $x " ) // print after successfully sent item
347
347
}
348
348
}
349
349
// subscribe on another thread with a slow subscriber using Rx
350
350
source
351
351
.observeOn(Schedulers .io(), false , 1 ) // specify buffer size of 1 item
352
352
.doOnComplete { println (" Complete" ) }
353
353
.subscribe { x ->
354
- println ( " Received $x " )
355
- Thread .sleep( 300 ) // 300 ms to process each item
354
+ Thread .sleep( 500 ) // 500ms to process each item
355
+ println ( " Processed $x " )
356
356
}
357
- delay(2000 ) // suspend main thread for couple of seconds
357
+ delay(2000 ) // suspend the main thread for a few seconds
358
358
}
359
359
```
360
360
@@ -363,23 +363,19 @@ fun main(args: Array<String>) = runBlocking<Unit> {
363
363
The output of this code nicely illustrates how backpressure works with coroutines:
364
364
365
365
``` text
366
- Sending 1 ...
367
- Sending 2 ...
368
- Received 1
369
- Sending 3 ...
370
- Received 2
371
- Sending 4 ...
372
- Received 3
373
- Sending 5 ...
374
- Received 4
375
- Received 5
366
+ Sent 1
367
+ Processed 1
368
+ Sent 2
369
+ Processed 2
370
+ Sent 3
371
+ Processed 3
376
372
Complete
377
373
```
378
374
379
375
<!-- - TEST -->
380
376
381
377
We see here how producer coroutine puts the first element in the buffer and is suspended while trying to send another
382
- one. Only after consumer receives the first item, the sender resumes to produce more .
378
+ one. Only after consumer processes the first item, producer sends the second one and resumes, etc .
383
379
384
380
385
381
### Rx Subject vs BroadcastChannel
0 commit comments