@@ -8,6 +8,9 @@ import kotlinx.coroutines.flow.*
8
8
import org.assertj.core.api.Assertions.assertThat
9
9
import org.junit.jupiter.api.Test
10
10
import java.text.SimpleDateFormat
11
+ import java.time.Duration
12
+ import java.time.Instant
13
+ import java.util.*
11
14
import java.util.concurrent.Callable
12
15
import java.util.concurrent.Executors
13
16
import java.util.stream.Collectors
@@ -26,212 +29,165 @@ class ParallelOperationCollectionsUnitTest {
26
29
Person (" Charlie" , 40 )
27
30
)
28
31
32
+ private fun List<Person>.assertResultsTrue () {
33
+ assertThat(this ).containsExactly(
34
+ Person (" Bob" , 16 , false ), Person (" Alice" , 30 , true ), Person (" Charlie" , 40 , true ), Person (" Ahmad" , 42 , true )
35
+ )
36
+ }
37
+
29
38
private val dateFormat = SimpleDateFormat (" yyyy-MM-dd:HH:mm:ss:SSS" )
30
39
31
- private fun assertResults (filteredPeople : List <Person >) {
32
- assertThat(filteredPeople).containsExactly(
33
- Person (" Bob" , 16 , false ), Person (" Alice" , 30 , true ), Person (" Charlie" , 40 , true ), Person (" Ahmad" , 42 , true )
40
+ private fun Person.printFormattedInfo () {
41
+ println (
42
+ " %-30s %-40s %s" .format(
43
+ dateFormat.format(Date .from(Instant .now())), Thread .currentThread().name, this
44
+ )
34
45
)
35
46
}
36
47
37
- @Test
38
- fun `using coroutines for parallel operations` () = runBlocking {
48
+ private fun printHeader () {
39
49
println (
40
50
" %-30s %-40s %s" .format(
41
51
" Time" , " Thread name" , " Operation"
42
52
)
43
53
)
44
- val startTime = System .currentTimeMillis()
54
+ println (" -" .repeat(115 ))
55
+ }
56
+
57
+ private fun Instant.printFooter () {
58
+ val endTime = Instant .now()
59
+ val duration = Duration .between(this , endTime)
60
+ println (" Total time taken: ${duration.toMillis()} ms" )
61
+ println ()
62
+ }
63
+
64
+ @Test
65
+ fun `using coroutines for parallel operations` () = runBlocking {
66
+ printHeader()
67
+ val startTime = Instant .now()
68
+
45
69
val filteredPeople = people.map { person ->
46
70
async {
47
71
launch {
48
72
person.isAdult = person.age >= 18
49
- println (
50
- " %-30s %-40s %s" .format(
51
- dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
52
- )
53
- )
73
+ person.printFormattedInfo()
54
74
}
55
75
person
56
76
}
57
77
}.awaitAll().filter { it.age > 15 }.sortedBy { it.age }
58
78
59
- val endTime = System .currentTimeMillis()
60
- val duration = endTime - startTime
61
- println (" Total time taken: $duration ms" )
62
- println ()
79
+ startTime.printFooter()
63
80
64
- assertResults( filteredPeople)
81
+ filteredPeople.assertResultsTrue( )
65
82
}
66
83
67
84
@OptIn(ExperimentalCoroutinesApi ::class )
68
85
@Test
69
86
fun `using coroutines for parallel operations with Flow` () = runBlocking {
70
- println (
71
- " %-30s %-40s %s" .format(
72
- " Time" , " Thread name" , " Operation"
73
- )
74
- )
75
- val startTime = System .currentTimeMillis()
87
+ printHeader()
88
+ val startTime = Instant .now()
89
+
76
90
val filteredPeople = people.asFlow().flatMapMerge { person ->
77
91
flow {
78
92
emit(async {
79
93
person.isAdult = person.age >= 18
80
-
81
- println (
82
- " %-30s %-40s %s" .format(
83
- dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
84
- )
85
- )
94
+ person.printFormattedInfo()
86
95
person
87
96
}.await())
88
97
}
89
98
}.filter { it.age > 15 }.toList().sortedBy { it.age }
90
99
91
- val endTime = System .currentTimeMillis()
92
- val duration = endTime - startTime
93
- println (" Total time taken: $duration ms" )
94
- println ()
100
+ startTime.printFooter()
95
101
96
- assertResults( filteredPeople)
102
+ filteredPeople.assertResultsTrue( )
97
103
}
98
104
99
105
@Test
100
106
fun `using RxJava for parallel operations` () { // Observable.class from io.reactivex;
101
- println (
102
- " %-30s %-40s %s" .format(
103
- " Time" , " Thread name" , " Operation"
104
- )
105
- )
106
- val startTime = System .currentTimeMillis()
107
+ printHeader()
108
+ val startTime = Instant .now()
109
+
107
110
val observable = Observable .fromIterable(people).flatMap({
108
111
Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
109
112
person.isAdult = person.age >= 18
110
- println (
111
- " %-30s %-40s %s" .format(
112
- dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
113
- )
114
- )
113
+ person.printFormattedInfo()
115
114
}
116
115
}, people.size) // Uses maxConcurrency for the number of elements
117
116
.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
118
117
119
- val endTime = System .currentTimeMillis()
120
- val duration = endTime - startTime
121
- println (" Total time taken: $duration ms" )
122
- println ()
118
+ startTime.printFooter()
123
119
124
- assertResults( observable)
120
+ observable.assertResultsTrue( )
125
121
}
126
122
127
123
@Test
128
124
fun `using RxKotlin for parallel operations` () { // ObservableKt.kt.class from io.reactivex.rxkotlin
129
- println (
130
- " %-30s %-40s %s" .format(
131
- " Time" , " Thread name" , " Operation"
132
- )
133
- )
134
- val startTime = System .currentTimeMillis()
125
+ printHeader()
126
+ val startTime = Instant .now()
127
+
135
128
val observable = people.toObservable().flatMap({
136
129
Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
137
130
person.isAdult = person.age >= 18
138
- println (
139
- " %-30s %-40s %s" .format(
140
- dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
141
- )
142
- )
131
+ person.printFormattedInfo()
143
132
}
144
133
}, people.size) // Uses maxConcurrency for the number of elements
145
134
.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
146
135
147
- val endTime = System .currentTimeMillis()
148
- val duration = endTime - startTime
149
- println (" Total time taken: $duration ms" )
150
- println ()
136
+ startTime.printFooter()
151
137
152
- assertResults( observable)
138
+ observable.assertResultsTrue( )
153
139
}
154
140
155
141
@Test
156
142
fun `using RxKotlin but still use 1 thread` () { // ObservableKt.kt.class from io.reactivex.rxkotlin
157
- println (
158
- " %-30s %-40s %s" .format(
159
- " Time" , " Thread name" , " Operation"
160
- )
161
- )
162
- val startTime = System .currentTimeMillis()
143
+ printHeader()
144
+ val startTime = Instant .now()
145
+
163
146
val observable =
164
147
people.toObservable().subscribeOn(Schedulers .io()).flatMap { Observable .just(it) }.doOnNext { person ->
165
148
person.isAdult = person.age >= 18
166
- println (
167
- " %-30s %-40s %s" .format(
168
- dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
169
- )
170
- )
149
+ person.printFormattedInfo()
171
150
}.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
172
151
173
- val endTime = System .currentTimeMillis()
174
- val duration = endTime - startTime
175
- println (" Total time taken: $duration ms" )
176
- println ()
152
+ startTime.printFooter()
177
153
178
- assertResults( observable)
154
+ observable.assertResultsTrue( )
179
155
}
180
156
181
157
@Test
182
158
fun `using parallelStream()` () {
183
- println (
184
- " %-30s %-40s %s" .format(
185
- " Time" , " Thread name" , " Operation"
186
- )
187
- )
188
- val startTime = System .currentTimeMillis()
159
+ printHeader()
160
+ val startTime = Instant .now()
161
+
189
162
val filteredPeople = people.parallelStream().map { person ->
190
163
person.isAdult = person.age >= 18
191
- println (
192
- " %-30s %-40s %s" .format(
193
- dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
194
- )
195
- )
164
+ person.printFormattedInfo()
196
165
person
197
166
}.filter { it.age > 15 }.sorted { p1, p2 -> p1.age.compareTo(p2.age) }.collect(Collectors .toList())
198
167
199
- val endTime = System .currentTimeMillis()
200
- val duration = endTime - startTime
201
- println (" Total time taken: $duration ms" )
202
- println ()
168
+ startTime.printFooter()
203
169
204
- assertResults( filteredPeople)
170
+ filteredPeople.assertResultsTrue( )
205
171
}
206
172
207
173
@Test
208
174
fun `using ScheduledExecutorService for parallel operations` () {
209
- println (
210
- " %-30s %-40s %s" .format(
211
- " Time" , " Thread name" , " Operation"
212
- )
213
- )
214
- val startTime = System .currentTimeMillis()
175
+ printHeader()
176
+ val startTime = Instant .now()
177
+
215
178
val executor = Executors .newFixedThreadPool(people.size)
216
179
val futures = people.map { person ->
217
180
executor.submit(Callable {
218
181
person.isAdult = person.age >= 18
219
- println (
220
- " %-30s %-40s %s" .format(
221
- dateFormat.format(System .currentTimeMillis()), Thread .currentThread().name, person
222
- )
223
- )
182
+ person.printFormattedInfo()
224
183
person
225
184
})
226
185
}.map { it.get() }.filter { it.age > 15 }.sortedBy { it.age }
227
186
228
- val endTime = System .currentTimeMillis()
229
- val duration = endTime - startTime
230
- println (" Total time taken: $duration ms" )
231
- println ()
187
+ executor.shutdown()
232
188
233
- assertResults(futures )
189
+ startTime.printFooter( )
234
190
235
- executor.shutdown ()
191
+ futures.assertResultsTrue ()
236
192
}
237
193
}
0 commit comments