@@ -10,17 +10,18 @@ import kotlinx.coroutines.flow.*
10
10
import kotlinx.coroutines.runBlocking
11
11
import org.assertj.core.api.Assertions.assertThat
12
12
import org.junit.jupiter.api.Test
13
- import java.text.SimpleDateFormat
13
+ import org.slf4j.LoggerFactory
14
14
import java.time.Duration
15
15
import java.time.Instant
16
- import java.util.*
17
16
import java.util.concurrent.Callable
18
17
import java.util.concurrent.Executors
19
18
import java.util.stream.Collectors
20
19
21
20
22
21
class ParallelOperationCollectionsUnitTest {
23
22
23
+ private val logger = LoggerFactory .getLogger(" " )
24
+
24
25
data class Person (val name : String , val age : Int , var isAdult : Boolean? = null )
25
26
26
27
private val people = listOf (
@@ -34,31 +35,28 @@ class ParallelOperationCollectionsUnitTest {
34
35
35
36
private fun List<Person>.assertResultsTrue () {
36
37
assertThat(this ).containsExactly(
37
- Person (" Bob" , 16 , false ),
38
- Person (" Alice" , 30 , true ),
39
- Person (" Charlie" , 40 , true ),
40
- Person (" Ahmad" , 42 , true )
41
- )
42
- }
43
-
44
- private fun Person.setAdult (){
45
- this .isAdult = this .age >= 18
46
-
47
- println (
48
- " %-25s %-45s %-40s" .format(
49
- SimpleDateFormat (" yyyy-MM-dd:HH:mm:ss:SSS" ).format(Date .from(Instant .now())),
50
- this .toString(),
51
- Thread .currentThread().name
38
+ Person (" Bob" , 16 , false ),
39
+ Person (" Alice" , 30 , true ),
40
+ Person (" Charlie" , 40 , true ),
41
+ Person (" Ahmad" , 42 , true )
52
42
)
53
- )
54
43
}
55
44
56
45
private fun String.printAsHeader () {
57
- println (" $this ${" -" .repeat(100 - this .length)} \n ${" %-25s %-45s %-40s" .format(" Time" , " Operation" , " Thread name" )} " )
46
+ logger.info(" {} {}" , " -" .repeat(32 - Thread .currentThread().name.length), this )
47
+ }
48
+
49
+ private fun Person.setAdult () {
50
+ this .isAdult = this .age >= 18
51
+ val line = " " .repeat(32 - Thread .currentThread().name.length)
52
+ logger.info(" {} {}" , line, this )
58
53
}
59
54
60
55
private fun Instant.printTotalTime () {
61
- println (" Total time taken: ${Duration .between(this , Instant .now()).toMillis()} ms\n " )
56
+ val totalTime = Duration .between(this , Instant .now()).toMillis()
57
+ logger.info(
58
+ " {} Total time taken: {} ms \n " , " -" .repeat(32 - Thread .currentThread().name.length), totalTime
59
+ )
62
60
}
63
61
64
62
@Test
@@ -67,11 +65,11 @@ class ParallelOperationCollectionsUnitTest {
67
65
val startTime = Instant .now()
68
66
69
67
val filteredPeople = people.map { person ->
70
- async {
71
- person.setAdult()
72
- person
73
- }
74
- }.awaitAll().filter { it.age > 15 }.sortedBy { it.age }
68
+ async {
69
+ person.setAdult()
70
+ person
71
+ }
72
+ }.awaitAll().filter { it.age > 15 }.sortedBy { it.age }
75
73
76
74
startTime.printTotalTime()
77
75
@@ -85,13 +83,15 @@ class ParallelOperationCollectionsUnitTest {
85
83
val startTime = Instant .now()
86
84
87
85
val filteredPeople = people.asFlow().flatMapMerge { person ->
88
- flow {
89
- emit(async {
90
- person.setAdult()
91
- person
92
- }.await())
93
- }
94
- }.filter { it.age > 15 }.toList().sortedBy { it.age }
86
+ flow {
87
+ emit(
88
+ async {
89
+ person.setAdult()
90
+ person
91
+ }.await()
92
+ )
93
+ }
94
+ }.filter { it.age > 15 }.toList().sortedBy { it.age }
95
95
96
96
startTime.printTotalTime()
97
97
@@ -103,12 +103,17 @@ class ParallelOperationCollectionsUnitTest {
103
103
" Using RxJava" .printAsHeader()
104
104
val startTime = Instant .now()
105
105
106
- val observable = Observable .fromIterable(people).flatMap({
107
- Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
108
- person.setAdult()
109
- }
110
- }, people.size) // Uses maxConcurrency for the number of elements
111
- .filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
106
+ val observable = Observable .fromIterable(people).flatMap(
107
+ {
108
+ Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
109
+ person.setAdult()
110
+ }
111
+ }, people.size // Uses maxConcurrency for the number of elements
112
+ )
113
+ .filter { it.age > 15 }
114
+ .toList()
115
+ .map { it.sortedBy { person -> person.age } }
116
+ .blockingGet()
112
117
113
118
startTime.printTotalTime()
114
119
@@ -120,12 +125,16 @@ class ParallelOperationCollectionsUnitTest {
120
125
" Using RxKotlin" .printAsHeader()
121
126
val startTime = Instant .now()
122
127
123
- val observable = people.toObservable().flatMap({
124
- Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
125
- person.setAdult()
126
- }
127
- }, people.size) // Uses maxConcurrency for the number of elements
128
- .filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
128
+ val observable = people.toObservable().flatMap(
129
+ {
130
+ Observable .just(it).subscribeOn(Schedulers .computation()).doOnNext { person ->
131
+ person.setAdult()
132
+ }
133
+ }, people.size // Uses maxConcurrency for the number of elements
134
+ ).filter { it.age > 15 }
135
+ .toList()
136
+ .map { it.sortedBy { person -> person.age } }
137
+ .blockingGet()
129
138
130
139
startTime.printTotalTime()
131
140
@@ -137,10 +146,12 @@ class ParallelOperationCollectionsUnitTest {
137
146
" Using RxKotlin 1 thread" .printAsHeader()
138
147
val startTime = Instant .now()
139
148
140
- val observable =
141
- people.toObservable().subscribeOn(Schedulers .io()).flatMap { Observable .just(it) }.doOnNext { person ->
142
- person.setAdult()
143
- }.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
149
+ val observable = people.toObservable()
150
+ .subscribeOn(Schedulers .io())
151
+ .flatMap { Observable .just(it) }
152
+ .doOnNext { person -> person.setAdult() }
153
+ .filter { it.age > 15 }.toList()
154
+ .map { it.sortedBy { person -> person.age } }.blockingGet()
144
155
145
156
startTime.printTotalTime()
146
157
@@ -153,10 +164,11 @@ class ParallelOperationCollectionsUnitTest {
153
164
val startTime = Instant .now()
154
165
155
166
val filteredPeople = people.parallelStream().map { person ->
156
-
157
- person.setAdult()
158
- person
159
- }.filter { it.age > 15 }.sorted { p1, p2 -> p1.age.compareTo(p2.age) }.collect(Collectors .toList())
167
+ person.setAdult()
168
+ person
169
+ }.filter { it.age > 15 }
170
+ .sorted { p1, p2 -> p1.age.compareTo(p2.age) }
171
+ .collect(Collectors .toList())
160
172
161
173
startTime.printTotalTime()
162
174
@@ -170,11 +182,11 @@ class ParallelOperationCollectionsUnitTest {
170
182
171
183
val executor = Executors .newFixedThreadPool(people.size)
172
184
val futures = people.map { person ->
173
- executor.submit(Callable {
174
- person.setAdult()
175
- person
176
- })
177
- }.map { it.get() }.filter { it.age > 15 }.sortedBy { it.age }
185
+ executor.submit(Callable {
186
+ person.setAdult()
187
+ person
188
+ })
189
+ }.map { it.get() }.filter { it.age > 15 }.sortedBy { it.age }
178
190
179
191
executor.shutdown()
180
192
@@ -183,3 +195,4 @@ class ParallelOperationCollectionsUnitTest {
183
195
futures.assertResultsTrue()
184
196
}
185
197
}
198
+
0 commit comments