@@ -3,18 +3,16 @@ package com.baeldung.parallelOperationsCollections
3
3
import io.reactivex.rxjava3.core.Observable
4
4
import io.reactivex.rxjava3.kotlin.toObservable
5
5
import io.reactivex.rxjava3.schedulers.Schedulers
6
- import kotlinx.coroutines.ExperimentalCoroutinesApi
7
- import kotlinx.coroutines.async
8
- import kotlinx.coroutines.awaitAll
6
+ import kotlinx.coroutines.*
9
7
import kotlinx.coroutines.flow.*
10
- import kotlinx.coroutines.runBlocking
11
8
import org.assertj.core.api.Assertions.assertThat
12
9
import org.junit.jupiter.api.Test
13
10
import org.slf4j.LoggerFactory
14
11
import java.time.Duration
15
12
import java.time.Instant
16
13
import java.util.concurrent.Callable
17
14
import java.util.concurrent.Executors
15
+ import java.util.concurrent.Future
18
16
import java.util.stream.Collectors
19
17
20
18
@@ -46,7 +44,7 @@ class ParallelOperationCollectionsUnitTest {
46
44
this .isAdult = this .age >= 18
47
45
logger.info(this .toString())
48
46
}
49
-
47
+
50
48
private fun Instant.printTotalTime () {
51
49
val totalTime = Duration .between(this , Instant .now()).toMillis()
52
50
logger.info(" Total time taken: {} ms" , totalTime)
@@ -59,7 +57,7 @@ class ParallelOperationCollectionsUnitTest {
59
57
60
58
val filteredPeople = people
61
59
.map { person ->
62
- async {
60
+ async( Dispatchers . IO ) {
63
61
person.setAdult()
64
62
person
65
63
}
@@ -79,14 +77,11 @@ class ParallelOperationCollectionsUnitTest {
79
77
val startTime = Instant .now()
80
78
81
79
val filteredPeople = people.asFlow()
80
+ .flowOn(Dispatchers .IO )
82
81
.flatMapMerge { person ->
83
82
flow {
84
- emit(
85
- async {
86
- person.setAdult()
87
- person
88
- }.await()
89
- )
83
+ person.setAdult()
84
+ emit(person)
90
85
}
91
86
}
92
87
.filter { it.age > 15 }.toList()
@@ -105,9 +100,11 @@ class ParallelOperationCollectionsUnitTest {
105
100
val observable = Observable .fromIterable(people)
106
101
.flatMap(
107
102
{
108
- Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
109
- person.setAdult()
110
- }
103
+ Observable .just(it)
104
+ .subscribeOn(Schedulers .computation())
105
+ .doOnNext { person ->
106
+ person.setAdult()
107
+ }
111
108
}, people.size // Uses maxConcurrency for the number of elements
112
109
)
113
110
.filter { it.age > 15 }
@@ -128,9 +125,11 @@ class ParallelOperationCollectionsUnitTest {
128
125
val observable = people.toObservable()
129
126
.flatMap(
130
127
{
131
- Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
132
- person.setAdult()
133
- }
128
+ Observable .just(it)
129
+ .subscribeOn(Schedulers .computation())
130
+ .doOnNext { person ->
131
+ person.setAdult()
132
+ }
134
133
}, people.size // Uses maxConcurrency for the number of elements
135
134
).filter { it.age > 15 }
136
135
.toList()
@@ -166,21 +165,25 @@ class ParallelOperationCollectionsUnitTest {
166
165
val startTime = Instant .now()
167
166
168
167
val executor = Executors .newFixedThreadPool(people.size)
169
- val futures = people
168
+ val futures: List < Future < Person >> = people
170
169
.map { person ->
171
170
executor.submit(Callable {
172
171
person.setAdult()
173
172
person
174
- }).get()
173
+ })
175
174
}
175
+
176
+ val results = futures
177
+ .map { it.get() }
176
178
.filter { it.age > 15 }
177
179
.sortedBy { it.age }
178
180
179
181
executor.shutdown()
180
182
181
183
startTime.printTotalTime()
182
184
183
- futures .assertOver15AndSortedByAge()
185
+ results .assertOver15AndSortedByAge()
184
186
}
187
+
185
188
}
186
189
0 commit comments