@@ -30,149 +30,207 @@ class ParallelOperationCollectionsUnitTest {
30
30
31
31
private fun assertResults (filteredPeople : List <Person >) {
32
32
assertThat(filteredPeople).containsExactly(
33
- Person (" Bob" , 16 , false ),
34
- Person (" Alice" , 30 , true ),
35
- Person (" Charlie" , 40 , true ),
36
- Person (" Ahmad" , 42 , true )
33
+ Person (" Bob" , 16 , false ), Person (" Alice" , 30 , true ), Person (" Charlie" , 40 , true ), Person (" Ahmad" , 42 , true )
37
34
)
38
35
}
39
36
40
37
@Test
41
38
fun `using coroutines for parallel operations` () = runBlocking {
39
+ println (
40
+ " %-30s %-40s %s" .format(
41
+ " Time" , " Thread name" , " Operation"
42
+ )
43
+ )
44
+ val startTime = System .currentTimeMillis()
42
45
val filteredPeople = people.map { person ->
43
46
async {
44
47
launch {
45
48
person.isAdult = person.age >= 18
46
49
println (
47
50
" %-30s %-40s %s" .format(
48
- dateFormat.format(System .currentTimeMillis()),
49
- Thread .currentThread().name,
50
- person
51
+ dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
51
52
)
52
53
)
53
54
}
54
55
person
55
56
}
56
57
}.awaitAll().filter { it.age > 15 }.sortedBy { it.age }
57
58
59
+ val endTime = System .currentTimeMillis()
60
+ val duration = endTime - startTime
61
+ println (" Total time taken: $duration ms" )
62
+ println ()
63
+
58
64
assertResults(filteredPeople)
59
65
}
60
66
61
67
@OptIn(ExperimentalCoroutinesApi ::class )
62
68
@Test
63
69
fun `using coroutines for parallel operations with Flow` () = runBlocking {
70
+ println (
71
+ " %-30s %-40s %s" .format(
72
+ " Time" , " Thread name" , " Operation"
73
+ )
74
+ )
75
+ val startTime = System .currentTimeMillis()
64
76
val filteredPeople = people.asFlow().flatMapMerge { person ->
65
77
flow {
66
78
emit(async {
67
79
person.isAdult = person.age >= 18
68
80
69
81
println (
70
82
" %-30s %-40s %s" .format(
71
- dateFormat.format(System .currentTimeMillis()),
72
- Thread .currentThread().name,
73
- person
83
+ dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
74
84
)
75
85
)
76
86
person
77
87
}.await())
78
88
}
79
89
}.filter { it.age > 15 }.toList().sortedBy { it.age }
80
90
91
+ val endTime = System .currentTimeMillis()
92
+ val duration = endTime - startTime
93
+ println (" Total time taken: $duration ms" )
94
+ println ()
95
+
81
96
assertResults(filteredPeople)
82
97
}
83
98
84
99
@Test
85
100
fun `using RxJava for parallel operations` () { // Observable.class from io.reactivex;
101
+ println (
102
+ " %-30s %-40s %s" .format(
103
+ " Time" , " Thread name" , " Operation"
104
+ )
105
+ )
106
+ val startTime = System .currentTimeMillis()
86
107
val observable = Observable .fromIterable(people).flatMap({
87
108
Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
88
109
person.isAdult = person.age >= 18
89
110
println (
90
111
" %-30s %-40s %s" .format(
91
- dateFormat.format(System .currentTimeMillis()),
92
- Thread .currentThread().name,
93
- person
112
+ dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
94
113
)
95
114
)
96
115
}
97
116
}, people.size) // Uses maxConcurrency for the number of elements
98
117
.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
99
118
119
+ val endTime = System .currentTimeMillis()
120
+ val duration = endTime - startTime
121
+ println (" Total time taken: $duration ms" )
122
+ println ()
123
+
100
124
assertResults(observable)
101
125
}
102
126
103
127
@Test
104
128
fun `using RxKotlin for parallel operations` () { // ObservableKt.kt.class from io.reactivex.rxkotlin
129
+ println (
130
+ " %-30s %-40s %s" .format(
131
+ " Time" , " Thread name" , " Operation"
132
+ )
133
+ )
134
+ val startTime = System .currentTimeMillis()
105
135
val observable = people.toObservable().flatMap({
106
136
Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
107
137
person.isAdult = person.age >= 18
108
138
println (
109
139
" %-30s %-40s %s" .format(
110
- dateFormat.format(System .currentTimeMillis()),
111
- Thread .currentThread().name,
112
- person
140
+ dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
113
141
)
114
142
)
115
143
}
116
144
}, people.size) // Uses maxConcurrency for the number of elements
117
145
.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
118
146
147
+ val endTime = System .currentTimeMillis()
148
+ val duration = endTime - startTime
149
+ println (" Total time taken: $duration ms" )
150
+ println ()
151
+
119
152
assertResults(observable)
120
153
}
121
154
122
155
@Test
123
156
fun `using RxKotlin but still use 1 thread` () { // ObservableKt.kt.class from io.reactivex.rxkotlin
157
+ println (
158
+ " %-30s %-40s %s" .format(
159
+ " Time" , " Thread name" , " Operation"
160
+ )
161
+ )
162
+ val startTime = System .currentTimeMillis()
124
163
val observable =
125
164
people.toObservable().subscribeOn(Schedulers .io()).flatMap { Observable .just(it) }.doOnNext { person ->
126
165
person.isAdult = person.age >= 18
127
166
println (
128
167
" %-30s %-40s %s" .format(
129
- dateFormat.format(System .currentTimeMillis()),
130
- Thread .currentThread().name,
131
- person
168
+ dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
132
169
)
133
170
)
134
171
}.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
135
172
173
+ val endTime = System .currentTimeMillis()
174
+ val duration = endTime - startTime
175
+ println (" Total time taken: $duration ms" )
176
+ println ()
177
+
136
178
assertResults(observable)
137
179
}
138
180
139
181
@Test
140
182
fun `using parallelStream()` () {
183
+ println (
184
+ " %-30s %-40s %s" .format(
185
+ " Time" , " Thread name" , " Operation"
186
+ )
187
+ )
188
+ val startTime = System .currentTimeMillis()
141
189
val filteredPeople = people.parallelStream().map { person ->
142
190
person.isAdult = person.age >= 18
143
191
println (
144
192
" %-30s %-40s %s" .format(
145
- dateFormat.format(System .currentTimeMillis()),
146
- Thread .currentThread().name,
147
- person
193
+ dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
148
194
)
149
195
)
150
196
person
151
197
}.filter { it.age > 15 }.sorted { p1, p2 -> p1.age.compareTo(p2.age) }.collect(Collectors .toList())
152
198
199
+ val endTime = System .currentTimeMillis()
200
+ val duration = endTime - startTime
201
+ println (" Total time taken: $duration ms" )
202
+ println ()
203
+
153
204
assertResults(filteredPeople)
154
205
}
155
206
156
207
@Test
157
208
fun `using ScheduledExecutorService for parallel operations` () {
158
- val executor = Executors .newCachedThreadPool()
209
+ println (
210
+ " %-30s %-40s %s" .format(
211
+ " Time" , " Thread name" , " Operation"
212
+ )
213
+ )
214
+ val startTime = System .currentTimeMillis()
215
+ val executor = Executors .newFixedThreadPool(people.size)
159
216
val futures = people.map { person ->
160
217
executor.submit(Callable {
161
218
person.isAdult = person.age >= 18
162
219
println (
163
220
" %-30s %-40s %s" .format(
164
- dateFormat.format(System .currentTimeMillis()),
165
- Thread .currentThread().name,
166
- person
221
+ dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
167
222
)
168
223
)
169
224
person
170
225
})
171
- }
226
+ }.map { it.get() }.filter { it.age > 15 }.sortedBy { it.age }
172
227
173
- val filteredPeople = futures.map { it.get() }.filter { it.age > 15 }.sortedBy { it.age }
228
+ val endTime = System .currentTimeMillis()
229
+ val duration = endTime - startTime
230
+ println (" Total time taken: $duration ms" )
231
+ println ()
174
232
175
- assertResults(filteredPeople )
233
+ assertResults(futures )
176
234
177
235
executor.shutdown()
178
236
}
0 commit comments