Skip to content

Commit 75160c7

Browse files
authored
Merge pull request #8 from hangga/rxjava-to-coroutines-rev0
Rxjava to coroutines rev0
2 parents 52a627a + 1422344 commit 75160c7

File tree

1 file changed

+17
-136
lines changed

1 file changed

+17
-136
lines changed

core-kotlin-modules/core-kotlin-concurrency-3/src/test/java/com/baeldung/singlerxjavatocoroutinedeferred/SingleRxJavaToCoroutineDeferredUnitTest.kt

Lines changed: 17 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import kotlinx.coroutines.*
66
import kotlinx.coroutines.rx3.await
77
import org.assertj.core.api.Assertions.assertThat
88
import org.junit.jupiter.api.Test
9-
import org.slf4j.LoggerFactory
10-
import kotlin.coroutines.CoroutineContext
119
import kotlin.coroutines.resume
1210
import kotlin.coroutines.resumeWithException
1311
import kotlin.coroutines.suspendCoroutine
@@ -16,8 +14,6 @@ import kotlin.test.assertTrue
1614

1715
class SingleRxJavaToCoroutineDeferredUnitTest {
1816

19-
private val logger = LoggerFactory.getLogger("")
20-
2117
data class Product(val id: Int, val name: String, val price: Double)
2218

2319
private val allProducts = listOf(
@@ -47,35 +43,14 @@ class SingleRxJavaToCoroutineDeferredUnitTest {
4743
}
4844

4945
@Test
50-
fun `using async direcly and blockingGet`() = runBlocking {
51-
val deferred = async { getFilteredProducts().blockingGet() } // simple, but must be careful because blocking main thread
46+
fun `using async and blockingGet`() = runBlocking {
47+
val deferred =
48+
async { getFilteredProducts().blockingGet() } // potentially blocking main thread while not careful
5249
deferred.assertOver500AndSorted() // assertion test
5350
}
5451

55-
// using async with extension
56-
private suspend fun <T : Any> Single<T>.toDeferredAsync(): Deferred<T> =
57-
coroutineScope { async { this@toDeferredAsync.blockingGet() } }
58-
59-
60-
@Test
61-
fun `test using async with extension`() = runBlocking {
62-
val deferredExt = getFilteredProducts().toDeferredAsync()
63-
deferredExt.assertOver500AndSorted()
64-
}
65-
66-
// using CoroutineScope(context).async
67-
private fun <T : Any> Single<T>.toDeferredWithContext(context: CoroutineContext): Deferred<T> =
68-
CoroutineScope(context).async { this@toDeferredWithContext.blockingGet() }
69-
70-
@Test
71-
fun `test using CoroutineScope with context and async`(): Unit = runBlocking {
72-
val deferred = getFilteredProducts().toDeferredWithContext(Dispatchers.IO)
73-
deferred.assertOver500AndSorted()
74-
}
75-
76-
// using CompletableDeferred & subscribe
7752
@Test
78-
fun `using CompletableDeferred and subscribe`() = runBlocking {
53+
fun `using subscribe and CompletableDeferred`() = runBlocking {
7954
val deferred = CompletableDeferred<List<Product>>()
8055
getFilteredProducts().subscribe({ products ->
8156
deferred.complete(products)
@@ -85,129 +60,35 @@ class SingleRxJavaToCoroutineDeferredUnitTest {
8560
deferred.assertOver500AndSorted()
8661
}
8762

88-
// using CompletableDeferred extension
89-
private fun <T : Any> Single<T>.toCompletableDeferred(): CompletableDeferred<T> {
90-
val completableDeferred = CompletableDeferred<T>()
91-
this.subscribe({ completableDeferred.complete(it) }, { completableDeferred.completeExceptionally(it) })
92-
return completableDeferred
93-
}
94-
95-
// using CompletableDeferred with custom callback
96-
private fun <T : Any> Single<T>.toCompletableDeferred(
97-
onSuccess: (CompletableDeferred<T>, T) -> Unit, onError: (CompletableDeferred<T>, Throwable) -> Unit
98-
): CompletableDeferred<T> {
99-
val completableDeferred = CompletableDeferred<T>()
100-
this.subscribe({ result ->
101-
completableDeferred.complete(result)
102-
onSuccess(completableDeferred, result)
103-
}, { error ->
104-
completableDeferred.completeExceptionally(error)
105-
onError(completableDeferred, error)
106-
})
107-
return completableDeferred
108-
}
109-
110-
@Test
111-
fun `test using CompletableDeferred extension`() = runBlocking {
112-
val deferred = getFilteredProducts().toCompletableDeferred()
113-
deferred.assertOver500AndSorted()
114-
}
115-
116-
@Test
117-
fun `test using CompletableDeferred with callback`(): Unit = runBlocking {
118-
getFilteredProducts().toCompletableDeferred(onSuccess = { deferredResult, _ ->
119-
launch { deferredResult.assertOver500AndSorted() }
120-
}, onError = { _, error ->
121-
logger.debug("Error: ${error.message}")
122-
}).await()
123-
}
124-
125-
// using suspendCoroutines directly
12663
@Test
127-
fun `using suspendCoroutines directly`(): Unit = runBlocking {
128-
val defered = async {
64+
fun `using suspendCoroutines`(): Unit = runBlocking {
65+
val deferred = async {
12966
suspendCoroutine { continuation ->
13067
getFilteredProducts().subscribe { result ->
13168
continuation.resume(result)
13269
}
13370
}
13471
}
135-
defered.assertOver500AndSorted()
136-
}
137-
138-
// using suspendCoroutines with extension
139-
private suspend fun <T : Any> Single<T>.toDeferredWithSuspend(): Deferred<T> {
140-
return coroutineScope {
141-
async {
142-
suspendCoroutine { continuation ->
143-
this@toDeferredWithSuspend.subscribe { result ->
144-
continuation.resume(result)
145-
}
146-
}
147-
}
148-
}
149-
}
150-
151-
@Test
152-
fun `test using suspendCoroutine extension`() = runBlocking {
153-
val deferred = getFilteredProducts().toDeferredWithSuspend()
15472
deferred.assertOver500AndSorted()
15573
}
15674

157-
// using suspendCancellableCoroutine
158-
private suspend fun <T : Any> Single<T>.toDeferredWithSuspendCancellableCoroutine(
159-
onSuccess: (Deferred<T>) -> Unit, onError: (Throwable) -> Unit
160-
): Deferred<T> {
161-
return coroutineScope {
162-
async {
163-
suspendCancellableCoroutine { continuation ->
164-
this@toDeferredWithSuspendCancellableCoroutine.subscribe({ result ->
165-
val deferredResult = CompletableDeferred<T>().apply {
166-
complete(result)
167-
continuation.resume(result)
168-
}
169-
onSuccess(deferredResult)
170-
}, { error ->
171-
continuation.resumeWithException(error)
172-
onError(error)
173-
})
174-
}
75+
@Test
76+
fun `using suspendCancellableCoroutine`(): Unit = runBlocking {
77+
val deferred = async {
78+
suspendCancellableCoroutine { continuation ->
79+
getFilteredProducts().subscribe({ result ->
80+
continuation.resume(result)
81+
}, { error ->
82+
continuation.resumeWithException(error)
83+
})
17584
}
17685
}
177-
}
178-
@Test
179-
fun `test using suspendCancellableCoroutine with custom callback`(): Unit = runBlocking {
180-
getFilteredProducts().toDeferredWithSuspendCancellableCoroutine(onSuccess = { deferredResult ->
181-
launch { deferredResult.assertOver500AndSorted() }
182-
}, onError = { error ->
183-
logger.debug("Error: ${error.message}")
184-
}).await()
185-
}
186-
187-
// using rx3 directly
188-
@Test
189-
fun `using rx3 directly`() = runBlocking {
190-
val deferred = async { getFilteredProducts().await() }
191-
deferred.assertOver500AndSorted()
192-
}
193-
194-
// using rx3 ext
195-
private suspend fun <T : Any> Single<T>.toDeferredRx3(): Deferred<T> =
196-
coroutineScope { async { this@toDeferredRx3.await() } }
197-
198-
@Test
199-
fun `test using rx3 ext`() = runBlocking {
200-
val deferred = getFilteredProducts().toDeferredRx3()
20186
deferred.assertOver500AndSorted()
20287
}
20388

204-
// using rx3 with context
205-
private fun <T : Any> Single<T>.toDeferredRx3WithContext(context: CoroutineContext): Deferred<T> =
206-
CoroutineScope(context).async { this@toDeferredRx3WithContext.await() }
207-
20889
@Test
209-
fun `test using rx3 with context`(): Unit = runBlocking {
210-
val deferred = getFilteredProducts().toDeferredRx3WithContext(Dispatchers.IO)
90+
fun `using Kotlin Coroutines Rx3`() = runBlocking {
91+
val deferred = async { getFilteredProducts().await() }
21192
deferred.assertOver500AndSorted()
21293
}
21394

0 commit comments

Comments
 (0)