1
+ package com.baeldung.singlerxjavatocoroutinedeferred
2
+
3
+ import io.reactivex.rxjava3.core.Single
4
+ import io.reactivex.rxjava3.schedulers.Schedulers
5
+ import kotlinx.coroutines.*
6
+ import kotlinx.coroutines.rx3.await
7
+ import org.assertj.core.api.Assertions.assertThat
8
+ import org.junit.jupiter.api.Test
9
+ import kotlin.coroutines.resume
10
+ import kotlin.coroutines.resumeWithException
11
+ import kotlin.coroutines.suspendCoroutine
12
+
13
+ class SingleRxJavaToCoroutineDeferredUnitTest {
14
+
15
+ data class Product (val id : Int , val name : String , val price : Double )
16
+
17
+ private val allProducts = listOf (
18
+ Product (1 , " Samsung" , 1200.0 ),
19
+ Product (2 , " Oppo" , 800.0 ),
20
+ Product (3 , " Nokia" , 450.0 ),
21
+ Product (4 , " Lenovo" , 550.0 ),
22
+ Product (5 , " ASUS" , 400.0 )
23
+ )
24
+
25
+ private fun getFilteredProducts (): Single <List <Product >> {
26
+ return Single .just(
27
+ allProducts
28
+ ).map { products ->
29
+ products.sortedBy { it.price }.filter { it.price > 500 }
30
+ }.subscribeOn(Schedulers .io())
31
+ }
32
+
33
+ private suspend fun Deferred <* >.assertOver500AndSorted () {
34
+ assertThat(this .await() as List <* >).containsExactly(
35
+ Product (4 , " Lenovo" , 550.0 ),
36
+ Product (2 , " Oppo" , 800.0 ),
37
+ Product (1 , " Samsung" , 1200.0 )
38
+ )
39
+ }
40
+
41
+ @Test
42
+ fun `using async and blockingGet` () = runBlocking {
43
+ val deferred = async { getFilteredProducts().blockingGet() }
44
+ deferred.assertOver500AndSorted()
45
+ }
46
+
47
+ @Test
48
+ fun `using subscribe and CompletableDeferred` () = runBlocking {
49
+ val deferred = CompletableDeferred <List <Product >>()
50
+ getFilteredProducts().subscribe(deferred::complete, deferred::completeExceptionally)
51
+ deferred.assertOver500AndSorted()
52
+ }
53
+
54
+ @Test
55
+ fun `using suspendCoroutines` (): Unit = runBlocking {
56
+ val deferred = async {
57
+ suspendCoroutine { continuation ->
58
+ getFilteredProducts().subscribe(continuation::resume, continuation::resumeWithException)
59
+ }
60
+ }
61
+ deferred.assertOver500AndSorted()
62
+ }
63
+
64
+ @Test
65
+ fun `using suspendCancellableCoroutine` (): Unit = runBlocking {
66
+ val deferred = async {
67
+ suspendCancellableCoroutine { continuation ->
68
+ getFilteredProducts().subscribe(continuation::resume, continuation::resumeWithException)
69
+ }
70
+ }
71
+ deferred.assertOver500AndSorted()
72
+ }
73
+
74
+ @Test
75
+ fun `using Kotlin Coroutines Rx3` () = runBlocking {
76
+ val deferred = async { getFilteredProducts().await() }
77
+ deferred.assertOver500AndSorted()
78
+ }
79
+
80
+ }
0 commit comments