Skip to content

Commit 278c031

Browse files
authored
Merge pull request #913 from hangga/parallel
KTLN-456 - Parallel Operations on Kotlin Collections
2 parents 49b2096 + d9481e9 commit 278c031

File tree

2 files changed

+199
-0
lines changed

2 files changed

+199
-0
lines changed

core-kotlin-modules/core-kotlin-collections-6/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,17 @@
1313
<version>1.0.0-SNAPSHOT</version>
1414
</parent>
1515

16+
<dependencies>
17+
<dependency>
18+
<groupId>io.reactivex.rxjava3</groupId>
19+
<artifactId>rxkotlin</artifactId>
20+
<version>${rxkotlin.version}</version>
21+
</dependency>
22+
23+
</dependencies>
24+
25+
<properties>
26+
<rxkotlin.version>3.0.1</rxkotlin.version>
27+
</properties>
28+
1629
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package com.baeldung.parallelOperationsCollections
2+
3+
import io.reactivex.rxjava3.core.Observable
4+
import io.reactivex.rxjava3.kotlin.toObservable
5+
import io.reactivex.rxjava3.schedulers.Schedulers
6+
import kotlinx.coroutines.ExperimentalCoroutinesApi
7+
import kotlinx.coroutines.async
8+
import kotlinx.coroutines.awaitAll
9+
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.runBlocking
11+
import org.assertj.core.api.Assertions.assertThat
12+
import org.junit.jupiter.api.Test
13+
import org.slf4j.LoggerFactory
14+
import java.time.Duration
15+
import java.time.Instant
16+
import java.util.concurrent.Callable
17+
import java.util.concurrent.Executors
18+
import java.util.stream.Collectors
19+
20+
21+
class ParallelOperationCollectionsUnitTest {
22+
23+
private val logger = LoggerFactory.getLogger("")
24+
25+
data class Person(val name: String, val age: Int, var isAdult: Boolean? = null)
26+
27+
private val people = listOf(
28+
Person("Martin", 12),
29+
Person("Ahmad", 42),
30+
Person("Alina", 13),
31+
Person("Alice", 30),
32+
Person("Bob", 16),
33+
Person("Charlie", 40)
34+
)
35+
36+
private fun List<Person>.assertOver15AndSortedByAge() {
37+
assertThat(this).containsExactly(
38+
Person("Bob", 16, false),
39+
Person("Alice", 30, true),
40+
Person("Charlie", 40, true),
41+
Person("Ahmad", 42, true)
42+
)
43+
}
44+
45+
private fun Person.setAdult() {
46+
this.isAdult = this.age >= 18
47+
logger.info(this.toString())
48+
}
49+
50+
private fun Instant.printTotalTime() {
51+
val totalTime = Duration.between(this, Instant.now()).toMillis()
52+
logger.info("Total time taken: {} ms", totalTime)
53+
}
54+
55+
@Test
56+
fun `using coroutines for parallel operations`() = runBlocking {
57+
logger.info("Using Coroutines")
58+
val startTime = Instant.now()
59+
60+
val filteredPeople = people
61+
.map { person ->
62+
async {
63+
person.setAdult()
64+
person
65+
}
66+
}.awaitAll()
67+
.filter { it.age > 15 }
68+
.sortedBy { it.age }
69+
70+
startTime.printTotalTime()
71+
72+
filteredPeople.assertOver15AndSortedByAge()
73+
}
74+
75+
@OptIn(ExperimentalCoroutinesApi::class)
76+
@Test
77+
fun `using coroutines for parallel operations with Flow`() = runBlocking {
78+
logger.info("Using Kotlin Flow")
79+
val startTime = Instant.now()
80+
81+
val filteredPeople = people.asFlow()
82+
.flatMapMerge { person ->
83+
flow {
84+
emit(
85+
async {
86+
person.setAdult()
87+
person
88+
}.await()
89+
)
90+
}
91+
}
92+
.filter { it.age > 15 }.toList()
93+
.sortedBy { it.age }
94+
95+
startTime.printTotalTime()
96+
97+
filteredPeople.assertOver15AndSortedByAge()
98+
}
99+
100+
@Test
101+
fun `using RxJava for parallel operations`() { // Observable.class from io.reactivex;
102+
logger.info("Using RxJava")
103+
val startTime = Instant.now()
104+
105+
val observable = Observable.fromIterable(people)
106+
.flatMap(
107+
{
108+
Observable.just(it).subscribeOn(Schedulers.computation()).doOnNext { person ->
109+
person.setAdult()
110+
}
111+
}, people.size // Uses maxConcurrency for the number of elements
112+
)
113+
.filter { it.age > 15 }
114+
.toList()
115+
.map { it.sortedBy { person -> person.age } }
116+
.blockingGet()
117+
118+
startTime.printTotalTime()
119+
120+
observable.assertOver15AndSortedByAge()
121+
}
122+
123+
@Test
124+
fun `using RxKotlin for parallel operations`() { // ObservableKt.kt.class from io.reactivex.rxkotlin
125+
logger.info("Using RxKotlin")
126+
val startTime = Instant.now()
127+
128+
val observable = people.toObservable()
129+
.flatMap(
130+
{
131+
Observable.just(it).subscribeOn(Schedulers.computation()).doOnNext { person ->
132+
person.setAdult()
133+
}
134+
}, people.size // Uses maxConcurrency for the number of elements
135+
).filter { it.age > 15 }
136+
.toList()
137+
.map { it.sortedBy { person -> person.age } }
138+
.blockingGet()
139+
140+
startTime.printTotalTime()
141+
142+
observable.assertOver15AndSortedByAge()
143+
}
144+
145+
@Test
146+
fun `using parallelStream()`() {
147+
logger.info("Using Stream API")
148+
val startTime = Instant.now()
149+
150+
val filteredPeople = people.parallelStream()
151+
.map { person ->
152+
person.setAdult()
153+
person
154+
}.filter { it.age > 15 }
155+
.sorted { p1, p2 -> p1.age.compareTo(p2.age) }
156+
.collect(Collectors.toList())
157+
158+
startTime.printTotalTime()
159+
160+
filteredPeople.assertOver15AndSortedByAge()
161+
}
162+
163+
@Test
164+
fun `using ExecutorService for parallel operations`() {
165+
logger.info("Using ExecutorService")
166+
val startTime = Instant.now()
167+
168+
val executor = Executors.newFixedThreadPool(people.size)
169+
val futures = people
170+
.map { person ->
171+
executor.submit(Callable {
172+
person.setAdult()
173+
person
174+
}).get()
175+
}
176+
.filter { it.age > 15 }
177+
.sortedBy { it.age }
178+
179+
executor.shutdown()
180+
181+
startTime.printTotalTime()
182+
183+
futures.assertOver15AndSortedByAge()
184+
}
185+
}
186+

0 commit comments

Comments
 (0)