@@ -3,8 +3,11 @@ package com.baeldung.parallelOperationsCollections
3
3
import io.reactivex.Observable
4
4
import io.reactivex.rxkotlin.toObservable
5
5
import io.reactivex.schedulers.Schedulers
6
- import kotlinx.coroutines.*
6
+ import kotlinx.coroutines.ExperimentalCoroutinesApi
7
+ import kotlinx.coroutines.async
8
+ import kotlinx.coroutines.awaitAll
7
9
import kotlinx.coroutines.flow.*
10
+ import kotlinx.coroutines.runBlocking
8
11
import org.assertj.core.api.Assertions.assertThat
9
12
import org.junit.jupiter.api.Test
10
13
import java.text.SimpleDateFormat
@@ -31,60 +34,55 @@ class ParallelOperationCollectionsUnitTest {
31
34
32
35
private fun List<Person>.assertResultsTrue () {
33
36
assertThat(this ).containsExactly(
34
- Person (" Bob" , 16 , false ), Person (" Alice" , 30 , true ), Person (" Charlie" , 40 , true ), Person (" Ahmad" , 42 , true )
37
+ Person (" Bob" , 16 , false ),
38
+ Person (" Alice" , 30 , true ),
39
+ Person (" Charlie" , 40 , true ),
40
+ Person (" Ahmad" , 42 , true )
35
41
)
36
42
}
37
43
38
- private val dateFormat = SimpleDateFormat ( " yyyy-MM-dd:HH:mm:ss:SSS " )
44
+ private val columnScheme = " %-25s %-45s %-40s "
39
45
40
46
private fun Person.printFormattedInfo () {
41
47
println (
42
- " %-30s %-40s %s" .format(
43
- dateFormat.format(Date .from(Instant .now())), Thread .currentThread().name, this
48
+ columnScheme.format(
49
+ SimpleDateFormat (" yyyy-MM-dd:HH:mm:ss:SSS" ).format(Date .from(Instant .now())),
50
+ this .toString(),
51
+ Thread .currentThread().name
44
52
)
45
53
)
46
54
}
47
55
48
- private fun printHeader () {
49
- println (
50
- " %-30s %-40s %s" .format(
51
- " Time" , " Thread name" , " Operation"
52
- )
53
- )
54
- println (" -" .repeat(115 ))
56
+ private fun String.printAsHeader () {
57
+ println (" $this ${" -" .repeat(100 - this .length)} \n ${columnScheme.format(" Time" , " Operation" , " Thread name" )} " )
55
58
}
56
59
57
- private fun Instant.printFooter () {
58
- val endTime = Instant .now()
59
- val duration = Duration .between(this , endTime)
60
- println (" Total time taken: ${duration.toMillis()} ms" )
61
- println ()
60
+ private fun Instant.printTotalTime () {
61
+ println (" Total time taken: ${Duration .between(this , Instant .now()).toMillis()} ms\n " )
62
62
}
63
63
64
64
@Test
65
65
fun `using coroutines for parallel operations` () = runBlocking {
66
- printHeader ()
66
+ " Using Coroutines " .printAsHeader ()
67
67
val startTime = Instant .now()
68
68
69
69
val filteredPeople = people.map { person ->
70
70
async {
71
- launch {
72
- person.isAdult = person.age >= 18
73
- person.printFormattedInfo()
74
- }
71
+ person.isAdult = person.age >= 18
72
+ person.printFormattedInfo()
75
73
person
76
74
}
77
75
}.awaitAll().filter { it.age > 15 }.sortedBy { it.age }
78
76
79
- startTime.printFooter ()
77
+ startTime.printTotalTime ()
80
78
81
79
filteredPeople.assertResultsTrue()
82
80
}
83
81
84
82
@OptIn(ExperimentalCoroutinesApi ::class )
85
83
@Test
86
84
fun `using coroutines for parallel operations with Flow` () = runBlocking {
87
- printHeader ()
85
+ " Using Kotlin Flow " .printAsHeader ()
88
86
val startTime = Instant .now()
89
87
90
88
val filteredPeople = people.asFlow().flatMapMerge { person ->
@@ -97,14 +95,14 @@ class ParallelOperationCollectionsUnitTest {
97
95
}
98
96
}.filter { it.age > 15 }.toList().sortedBy { it.age }
99
97
100
- startTime.printFooter ()
98
+ startTime.printTotalTime ()
101
99
102
100
filteredPeople.assertResultsTrue()
103
101
}
104
102
105
103
@Test
106
104
fun `using RxJava for parallel operations` () { // Observable.class from io.reactivex;
107
- printHeader ()
105
+ " Using RxJava " .printAsHeader ()
108
106
val startTime = Instant .now()
109
107
110
108
val observable = Observable .fromIterable(people).flatMap({
@@ -115,14 +113,14 @@ class ParallelOperationCollectionsUnitTest {
115
113
}, people.size) // Uses maxConcurrency for the number of elements
116
114
.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
117
115
118
- startTime.printFooter ()
116
+ startTime.printTotalTime ()
119
117
120
118
observable.assertResultsTrue()
121
119
}
122
120
123
121
@Test
124
122
fun `using RxKotlin for parallel operations` () { // ObservableKt.kt.class from io.reactivex.rxkotlin
125
- printHeader ()
123
+ " Using RxKotlin " .printAsHeader ()
126
124
val startTime = Instant .now()
127
125
128
126
val observable = people.toObservable().flatMap({
@@ -133,14 +131,14 @@ class ParallelOperationCollectionsUnitTest {
133
131
}, people.size) // Uses maxConcurrency for the number of elements
134
132
.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
135
133
136
- startTime.printFooter ()
134
+ startTime.printTotalTime ()
137
135
138
136
observable.assertResultsTrue()
139
137
}
140
138
141
139
@Test
142
140
fun `using RxKotlin but still use 1 thread` () { // ObservableKt.kt.class from io.reactivex.rxkotlin
143
- printHeader ()
141
+ " Using RxKotlin 1 thread " .printAsHeader ()
144
142
val startTime = Instant .now()
145
143
146
144
val observable =
@@ -149,14 +147,14 @@ class ParallelOperationCollectionsUnitTest {
149
147
person.printFormattedInfo()
150
148
}.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
151
149
152
- startTime.printFooter ()
150
+ startTime.printTotalTime ()
153
151
154
152
observable.assertResultsTrue()
155
153
}
156
154
157
155
@Test
158
156
fun `using parallelStream()` () {
159
- printHeader ()
157
+ " Using Stream API " .printAsHeader ()
160
158
val startTime = Instant .now()
161
159
162
160
val filteredPeople = people.parallelStream().map { person ->
@@ -165,14 +163,14 @@ class ParallelOperationCollectionsUnitTest {
165
163
person
166
164
}.filter { it.age > 15 }.sorted { p1, p2 -> p1.age.compareTo(p2.age) }.collect(Collectors .toList())
167
165
168
- startTime.printFooter ()
166
+ startTime.printTotalTime ()
169
167
170
168
filteredPeople.assertResultsTrue()
171
169
}
172
170
173
171
@Test
174
- fun `using ScheduledExecutorService for parallel operations` () {
175
- printHeader ()
172
+ fun `using ExecutorService for parallel operations` () {
173
+ " Using ExecutorService " .printAsHeader ()
176
174
val startTime = Instant .now()
177
175
178
176
val executor = Executors .newFixedThreadPool(people.size)
@@ -186,7 +184,7 @@ class ParallelOperationCollectionsUnitTest {
186
184
187
185
executor.shutdown()
188
186
189
- startTime.printFooter ()
187
+ startTime.printTotalTime ()
190
188
191
189
futures.assertResultsTrue()
192
190
}
0 commit comments