Skip to content

Commit 82a918a

Browse files
authored
Merge pull request #457
Coroutines scheduler
2 parents 7f2ed2f + 9542f1c commit 82a918a

File tree

77 files changed

+4406
-382
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+4406
-382
lines changed

benchmarks/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repositories {
1111
}
1212

1313
dependencies {
14-
jmh 'com.typesafe.akka:akka-actor:2.0.2'
14+
jmh 'com.typesafe.akka:akka-actor_2.12:2.5.0'
1515
jmh project(':kotlinx-coroutines-core-common')
1616
jmh project(':kotlinx-coroutines-core')
1717
jmh project(':kotlinx-coroutines-core').sourceSets.test.output
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package benchmarks
2+
3+
import benchmarks.ForkJoinBenchmark.Companion.BATCH_SIZE
4+
import kotlinx.coroutines.experimental.CommonPool
5+
import kotlinx.coroutines.experimental.Deferred
6+
import kotlinx.coroutines.experimental.async
7+
import kotlinx.coroutines.experimental.runBlocking
8+
import org.openjdk.jmh.annotations.*
9+
import java.util.concurrent.*
10+
import kotlin.coroutines.experimental.CoroutineContext
11+
12+
/*
13+
* Comparison of fork-join tasks using specific FJP API and classic [async] jobs.
14+
* FJP job is organized in perfectly balanced binary tree, every leaf node computes
15+
* FPU-heavy sum over its data and intermediate nodes sum results.
16+
*
17+
* Fine-grained batch size (8192 * 1024 tasks, 128 in sequential batch)
18+
* ForkJoinBenchmark.asyncExperimental avgt 10 681.512 ± 32.069 ms/op
19+
* ForkJoinBenchmark.asyncFjp avgt 10 845.386 ± 73.204 ms/op
20+
* ForkJoinBenchmark.fjpRecursiveTask avgt 10 692.120 ± 26.224 ms/op
21+
* ForkJoinBenchmark.fjpTask avgt 10 791.087 ± 66.544 ms/op
22+
*
23+
* Too small tasks (8192 * 1024 tasks, 128 batch, 16 in sequential batch)
24+
* Benchmark Mode Cnt Score Error Units
25+
* ForkJoinBenchmark.asyncExperimental avgt 10 1273.271 ± 190.372 ms/op
26+
* ForkJoinBenchmark.asyncFjp avgt 10 1406.102 ± 216.793 ms/op
27+
* ForkJoinBenchmark.fjpRecursiveTask avgt 10 849.941 ± 141.254 ms/op
28+
* ForkJoinBenchmark.fjpTask avgt 10 831.554 ± 57.276 ms/op
29+
*/
30+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
31+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
32+
@Fork(value = 2)
33+
@BenchmarkMode(Mode.AverageTime)
34+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
35+
@State(Scope.Benchmark)
36+
open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
37+
38+
companion object {
39+
/*
40+
* Change task size to control global granularity of benchmark
41+
* Change batch size to control affinity/work stealing/scheduling overhead effects
42+
*/
43+
const val TASK_SIZE = 8192 * 1024
44+
const val BATCH_SIZE = 32 * 8192
45+
}
46+
47+
lateinit var coefficients: LongArray
48+
override var dispatcher: String = "experimental"
49+
50+
@Setup
51+
override fun setup() {
52+
super.setup()
53+
coefficients = LongArray(TASK_SIZE) { ThreadLocalRandom.current().nextLong(0, 1024 * 1024) }
54+
}
55+
56+
@Benchmark
57+
fun asyncFjp() = runBlocking {
58+
startAsync(coefficients, 0, coefficients.size, CommonPool).await()
59+
}
60+
61+
@Benchmark
62+
fun asyncExperimental() = runBlocking {
63+
startAsync(coefficients, 0, coefficients.size, benchmarkContext).await()
64+
}
65+
66+
@Benchmark
67+
fun fjpRecursiveTask(): Double {
68+
val task = RecursiveAction(coefficients, 0, coefficients.size)
69+
return ForkJoinPool.commonPool().submit(task).join()
70+
}
71+
72+
@Benchmark
73+
fun fjpTask(): Double {
74+
val task = Task(coefficients, 0, coefficients.size)
75+
return ForkJoinPool.commonPool().submit(task).join()
76+
}
77+
78+
suspend fun startAsync(coefficients: LongArray, start: Int, end: Int, dispatcher: CoroutineContext): Deferred<Double> = async(dispatcher) {
79+
if (end - start <= BATCH_SIZE) {
80+
compute(coefficients, start, end)
81+
} else {
82+
val first = startAsync(coefficients, start, start + (end - start) / 2, dispatcher)
83+
val second = startAsync(coefficients, start + (end - start) / 2, end, dispatcher)
84+
first.await() + second.await()
85+
}
86+
}
87+
88+
class Task(val coefficients: LongArray, val start: Int, val end: Int) : RecursiveTask<Double>() {
89+
override fun compute(): Double {
90+
if (end - start <= BATCH_SIZE) {
91+
return compute(coefficients, start, end)
92+
}
93+
94+
val first = Task(coefficients, start, start + (end - start) / 2).fork()
95+
val second = Task(coefficients, start + (end - start) / 2, end).fork()
96+
97+
var result = 0.0
98+
result += first.join()
99+
result += second.join()
100+
return result
101+
}
102+
103+
private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
104+
var result = 0.0
105+
for (i in start until end) {
106+
result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
107+
}
108+
109+
return result
110+
}
111+
}
112+
113+
class RecursiveAction(val coefficients: LongArray, val start: Int, val end: Int, @Volatile var result: Double = 0.0,
114+
parent: RecursiveAction? = null) : CountedCompleter<Double>(parent) {
115+
116+
private var first: ForkJoinTask<Double>? = null
117+
private var second: ForkJoinTask<Double>? = null
118+
119+
override fun getRawResult(): Double {
120+
return result
121+
}
122+
123+
override fun setRawResult(t: Double) {
124+
result = t
125+
}
126+
127+
override fun compute() {
128+
if (end - start <= BATCH_SIZE) {
129+
rawResult = compute(coefficients, start, end)
130+
} else {
131+
pendingCount = 2
132+
// One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
133+
first = RecursiveAction(coefficients, start, start + (end - start) / 2, parent = this).fork()
134+
second = RecursiveAction(coefficients, start + (end - start) / 2, end, parent = this).fork()
135+
}
136+
137+
tryComplete()
138+
}
139+
140+
override fun onCompletion(caller: CountedCompleter<*>?) {
141+
if (caller !== this) {
142+
rawResult = first!!.rawResult + second!!.rawResult
143+
}
144+
super.onCompletion(caller)
145+
}
146+
}
147+
}
148+
149+
150+
private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
151+
var result = 0.0
152+
for (i in start until end) {
153+
result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
154+
}
155+
156+
return result
157+
}
158+
159+
160+
fun main(args: Array<String>) {
161+
// Hand-rollled test
162+
val arr = LongArray(BATCH_SIZE * 8) { ThreadLocalRandom.current().nextLong(0, 100) }
163+
164+
println(ForkJoinPool.commonPool().submit(ForkJoinBenchmark.Task(arr, 0, arr.size)).join())
165+
println(ForkJoinBenchmark.RecursiveAction(arr, 0, arr.size).invoke())
166+
}

benchmarks/src/jmh/kotlin/benchmarks/GuideSyncBenchmark.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,36 +45,36 @@ open class GuideSyncBenchmark {
4545

4646
@Benchmark
4747
fun sync01Problem() {
48-
guide.sync.example01.main(emptyArray())
48+
kotlinx.coroutines.experimental.guide.sync01.main(emptyArray())
4949
}
5050

5151
@Benchmark
5252
fun sync02Volatile() {
53-
guide.sync.example02.main(emptyArray())
53+
kotlinx.coroutines.experimental.guide.sync02.main(emptyArray())
5454
}
5555

5656
@Benchmark
5757
fun sync03AtomicInt() {
58-
guide.sync.example03.main(emptyArray())
58+
kotlinx.coroutines.experimental.guide.sync03.main(emptyArray())
5959
}
6060

6161
@Benchmark
6262
fun sync04ConfineFine() {
63-
guide.sync.example04.main(emptyArray())
63+
kotlinx.coroutines.experimental.guide.sync04.main(emptyArray())
6464
}
6565

6666
@Benchmark
6767
fun sync05ConfineCoarse() {
68-
guide.sync.example05.main(emptyArray())
68+
kotlinx.coroutines.experimental.guide.sync05.main(emptyArray())
6969
}
7070

7171
@Benchmark
7272
fun sync06Mutex() {
73-
guide.sync.example06.main(emptyArray())
73+
kotlinx.coroutines.experimental.guide.sync06.main(emptyArray())
7474
}
7575

7676
@Benchmark
7777
fun sync07Actor() {
78-
guide.sync.example07.main(emptyArray())
78+
kotlinx.coroutines.experimental.guide.sync07.main(emptyArray())
7979
}
8080
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package benchmarks
2+
3+
import kotlinx.coroutines.experimental.launch
4+
import org.openjdk.jmh.annotations.*
5+
import java.util.concurrent.CyclicBarrier
6+
import java.util.concurrent.TimeUnit
7+
8+
/*
9+
* Benchmark to measure scheduling overhead in comparison with FJP.
10+
* LaunchBenchmark.massiveLaunch experimental avgt 30 328.662 ± 52.789 us/op
11+
* LaunchBenchmark.massiveLaunch fjp avgt 30 179.762 ± 3.931 us/op
12+
*/
13+
@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
14+
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
15+
@Fork(value = 2)
16+
@BenchmarkMode(Mode.AverageTime)
17+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
18+
@State(Scope.Benchmark)
19+
open class LaunchBenchmark : ParametrizedDispatcherBase() {
20+
21+
@Param("experimental", "fjp")
22+
override var dispatcher: String = "fjp"
23+
24+
private val jobsToLaunch = 100
25+
private val submitters = 4
26+
27+
private val allLaunched = CyclicBarrier(submitters)
28+
private val stopBarrier = CyclicBarrier(submitters + 1)
29+
30+
@Benchmark
31+
fun massiveLaunch() {
32+
repeat(submitters) {
33+
launch(benchmarkContext) {
34+
// Wait until all cores are occupied
35+
allLaunched.await()
36+
allLaunched.reset()
37+
38+
(1..jobsToLaunch).map {
39+
launch(coroutineContext) {
40+
// do nothing
41+
}
42+
}.map { it.join() }
43+
44+
stopBarrier.await()
45+
}
46+
}
47+
48+
stopBarrier.await()
49+
stopBarrier.reset()
50+
}
51+
52+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package benchmarks
2+
3+
import benchmarks.actors.CORES_COUNT
4+
import kotlinx.coroutines.experimental.CommonPool
5+
import kotlinx.coroutines.experimental.ThreadPoolDispatcher
6+
import kotlinx.coroutines.experimental.newFixedThreadPoolContext
7+
import kotlinx.coroutines.experimental.scheduling.*
8+
import org.openjdk.jmh.annotations.Param
9+
import org.openjdk.jmh.annotations.Setup
10+
import org.openjdk.jmh.annotations.TearDown
11+
import java.io.Closeable
12+
import kotlin.coroutines.experimental.CoroutineContext
13+
14+
/**
15+
* Base class to use different [CoroutineContext] in benchmarks via [Param] in inheritors.
16+
* Currently allowed values are "fjp" for [CommonPool] and ftp_n for [ThreadPoolDispatcher] with n threads.
17+
*/
18+
abstract class ParametrizedDispatcherBase {
19+
20+
abstract var dispatcher: String
21+
lateinit var benchmarkContext: CoroutineContext // coroutineContext clashes with scope parameter
22+
var closeable: Closeable? = null
23+
24+
@Setup
25+
open fun setup() {
26+
benchmarkContext = when {
27+
dispatcher == "fjp" -> CommonPool
28+
dispatcher == "experimental" -> {
29+
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
30+
}
31+
dispatcher.startsWith("ftp") -> {
32+
newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it }
33+
}
34+
else -> error("Unexpected dispatcher: $dispatcher")
35+
}
36+
}
37+
38+
@TearDown
39+
fun tearDown() {
40+
closeable?.close()
41+
}
42+
43+
}

0 commit comments

Comments
 (0)