Skip to content

Commit 1d3dd26

Browse files
committed
new PR
1 parent 116eb78 commit 1d3dd26

File tree

2 files changed

+191
-0
lines changed

2 files changed

+191
-0
lines changed

core-kotlin-modules/core-kotlin-collections-6/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,16 @@
1313
<version>1.0.0-SNAPSHOT</version>
1414
</parent>
1515

16+
<dependencies>
17+
<dependency>
18+
<groupId>io.reactivex.rxjava2</groupId>
19+
<artifactId>rxkotlin</artifactId>
20+
<version>${rxkotlin.version}</version>
21+
</dependency>
22+
</dependencies>
23+
24+
<properties>
25+
<rxkotlin.version>2.3.0</rxkotlin.version>
26+
</properties>
27+
1628
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package com.baeldung.parallelOperationsCollections
2+
3+
import io.reactivex.Observable
4+
import io.reactivex.rxkotlin.toObservable
5+
import io.reactivex.schedulers.Schedulers
6+
import kotlinx.coroutines.*
7+
import kotlinx.coroutines.flow.*
8+
import org.assertj.core.api.Assertions.assertThat
9+
import org.junit.jupiter.api.Test
10+
import java.text.SimpleDateFormat
11+
import java.util.concurrent.Callable
12+
import java.util.concurrent.Executors
13+
import java.util.stream.Collectors
14+
15+
16+
class ParallelOperationCollectionsUnitTest {
17+
18+
data class Person(val name: String, val age: Int, var isAdult: Boolean? = null)
19+
20+
private val people = listOf(
21+
Person("Martin", 12),
22+
Person("Ahmad", 42),
23+
Person("Alina", 13),
24+
Person("Alice", 30),
25+
Person("Bob", 16),
26+
Person("Charlie", 40)
27+
)
28+
29+
private val dateFormat = SimpleDateFormat("yyyy-MM-dd:HH:mm:ss:SSS")
30+
31+
private fun assertResults(filteredPeople: List<Person>) {
32+
assertThat(filteredPeople).containsExactly(
33+
Person("Bob", 16, false),
34+
Person("Alice", 30, true),
35+
Person("Charlie", 40, true),
36+
Person("Ahmad", 42, true)
37+
)
38+
}
39+
40+
@Test
41+
fun `using coroutines for parallel operations`() = runBlocking {
42+
val filteredPeople = people.map { person ->
43+
async {
44+
launch {
45+
person.isAdult = person.age >= 18
46+
println(
47+
"%-30s %-40s %s".format(
48+
dateFormat.format(System.currentTimeMillis()),
49+
Thread.currentThread().name,
50+
person
51+
)
52+
)
53+
}
54+
person
55+
}
56+
}.awaitAll().filter { it.age > 15 }.sortedBy { it.age }
57+
58+
assertResults(filteredPeople)
59+
}
60+
61+
@OptIn(ExperimentalCoroutinesApi::class)
62+
@Test
63+
fun `using coroutines for parallel operations with Flow`() = runBlocking {
64+
val filteredPeople = people.asFlow().flatMapMerge { person ->
65+
flow {
66+
emit(async {
67+
person.isAdult = person.age >= 18
68+
69+
println(
70+
"%-30s %-40s %s".format(
71+
dateFormat.format(System.currentTimeMillis()),
72+
Thread.currentThread().name,
73+
person
74+
)
75+
)
76+
person
77+
}.await())
78+
}
79+
}.filter { it.age > 15 }.toList().sortedBy { it.age }
80+
81+
assertResults(filteredPeople)
82+
}
83+
84+
@Test
85+
fun `using RxJava for parallel operations`() { // Observable.class from io.reactivex;
86+
val observable = Observable.fromIterable(people).flatMap({
87+
Observable.just(it).subscribeOn(Schedulers.computation()).doOnNext { person ->
88+
person.isAdult = person.age >= 18
89+
println(
90+
"%-30s %-40s %s".format(
91+
dateFormat.format(System.currentTimeMillis()),
92+
Thread.currentThread().name,
93+
person
94+
)
95+
)
96+
}
97+
}, people.size) // Uses maxConcurrency for the number of elements
98+
.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
99+
100+
assertResults(observable)
101+
}
102+
103+
@Test
104+
fun `using RxKotlin for parallel operations`() { // ObservableKt.kt.class from io.reactivex.rxkotlin
105+
val observable = people.toObservable().flatMap({
106+
Observable.just(it).subscribeOn(Schedulers.computation()).doOnNext { person ->
107+
person.isAdult = person.age >= 18
108+
println(
109+
"%-30s %-40s %s".format(
110+
dateFormat.format(System.currentTimeMillis()),
111+
Thread.currentThread().name,
112+
person
113+
)
114+
)
115+
}
116+
}, people.size) // Uses maxConcurrency for the number of elements
117+
.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
118+
119+
assertResults(observable)
120+
}
121+
122+
@Test
123+
fun `using RxKotlin but still use 1 thread`() { // ObservableKt.kt.class from io.reactivex.rxkotlin
124+
val observable =
125+
people.toObservable().subscribeOn(Schedulers.io()).flatMap { Observable.just(it) }.doOnNext { person ->
126+
person.isAdult = person.age >= 18
127+
println(
128+
"%-30s %-40s %s".format(
129+
dateFormat.format(System.currentTimeMillis()),
130+
Thread.currentThread().name,
131+
person
132+
)
133+
)
134+
}.filter { it.age > 15 }.toList().map { it.sortedBy { person -> person.age } }.blockingGet()
135+
136+
assertResults(observable)
137+
}
138+
139+
@Test
140+
fun `using parallelStream()`() {
141+
val filteredPeople = people.parallelStream().map { person ->
142+
person.isAdult = person.age >= 18
143+
println(
144+
"%-30s %-40s %s".format(
145+
dateFormat.format(System.currentTimeMillis()),
146+
Thread.currentThread().name,
147+
person
148+
)
149+
)
150+
person
151+
}.filter { it.age > 15 }.sorted { p1, p2 -> p1.age.compareTo(p2.age) }.collect(Collectors.toList())
152+
153+
assertResults(filteredPeople)
154+
}
155+
156+
@Test
157+
fun `using ScheduledExecutorService for parallel operations`() {
158+
val executor = Executors.newCachedThreadPool()
159+
val futures = people.map { person ->
160+
executor.submit(Callable {
161+
person.isAdult = person.age >= 18
162+
println(
163+
"%-30s %-40s %s".format(
164+
dateFormat.format(System.currentTimeMillis()),
165+
Thread.currentThread().name,
166+
person
167+
)
168+
)
169+
person
170+
})
171+
}
172+
173+
val filteredPeople = futures.map { it.get() }.filter { it.age > 15 }.sortedBy { it.age }
174+
175+
assertResults(filteredPeople)
176+
177+
executor.shutdown()
178+
}
179+
}

0 commit comments

Comments
 (0)