Skip to content

Commit fc87803

Browse files
committed
Introducing scheduler-sensitive benchmarks
1 parent 6dc049d commit fc87803

11 files changed

+1075
-160
lines changed

benchmarks/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ repositories {
77
}
88

99
dependencies {
10-
jmh 'com.typesafe.akka:akka-actor:2.0.2'
10+
jmh 'com.typesafe.akka:akka-actor_2.12:2.5.0'
1111
jmh project(':kotlinx-coroutines-core-common')
1212
jmh project(':kotlinx-coroutines-core')
1313
jmh project(':kotlinx-coroutines-core').sourceSets.test.output
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package benchmarks
2+
3+
import benchmarks.ForkJoinBenchmark.Companion.BATCH_SIZE
4+
import kotlinx.coroutines.experimental.CommonPool
5+
import kotlinx.coroutines.experimental.Deferred
6+
import kotlinx.coroutines.experimental.async
7+
import kotlinx.coroutines.experimental.runBlocking
8+
import org.openjdk.jmh.annotations.*
9+
import java.util.concurrent.*
10+
import kotlin.coroutines.experimental.CoroutineContext
11+
12+
/*
13+
* Comparison of fork-join tasks using specific FJP API and classic [async] jobs.
14+
* FJP job is organized in perfectly balanced binary tree, every leaf node computes
15+
* FPU-heavy sum over its data and intermediate nodes sum results.
16+
*
17+
* Fine-grained batch size (8192 * 1024 tasks, 128 in sequential batch)
18+
* ForkJoinBenchmark.asyncExperimental avgt 10 681.512 ± 32.069 ms/op
19+
* ForkJoinBenchmark.asyncFjp avgt 10 845.386 ± 73.204 ms/op
20+
* ForkJoinBenchmark.fjpRecursiveTask avgt 10 692.120 ± 26.224 ms/op
21+
* ForkJoinBenchmark.fjpTask avgt 10 791.087 ± 66.544 ms/op
22+
*
23+
* Too small tasks (8192 * 1024 tasks, 128 batch, 16 in sequential batch)
24+
* Benchmark Mode Cnt Score Error Units
25+
* ForkJoinBenchmark.asyncExperimental avgt 10 1273.271 ± 190.372 ms/op
26+
* ForkJoinBenchmark.asyncFjp avgt 10 1406.102 ± 216.793 ms/op
27+
* ForkJoinBenchmark.fjpRecursiveTask avgt 10 849.941 ± 141.254 ms/op
28+
* ForkJoinBenchmark.fjpTask avgt 10 831.554 ± 57.276 ms/op
29+
*/
30+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
31+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
32+
@Fork(value = 2)
33+
@BenchmarkMode(Mode.AverageTime)
34+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
35+
@State(Scope.Benchmark)
36+
open class ForkJoinBenchmark : ParametrizedDispatcherBase() {
37+
38+
companion object {
39+
/*
40+
* Change task size to control global granularity of benchmark
41+
* Change batch size to control affinity/work stealing/scheduling overhead effects
42+
*/
43+
const val TASK_SIZE = 8192 * 1024
44+
const val BATCH_SIZE = 32 * 8192
45+
}
46+
47+
lateinit var coefficients: LongArray
48+
override var dispatcher: String = "experimental"
49+
50+
@Setup
51+
override fun setup() {
52+
super.setup()
53+
coefficients = LongArray(TASK_SIZE) { ThreadLocalRandom.current().nextLong(0, 1024 * 1024) }
54+
}
55+
56+
@Benchmark
57+
fun asyncFjp() = runBlocking {
58+
startAsync(coefficients, 0, coefficients.size, CommonPool).await()
59+
}
60+
61+
@Benchmark
62+
fun asyncExperimental() = runBlocking {
63+
startAsync(coefficients, 0, coefficients.size, benchmarkContext).await()
64+
}
65+
66+
@Benchmark
67+
fun fjpRecursiveTask(): Double {
68+
val task = RecursiveAction(coefficients, 0, coefficients.size)
69+
return ForkJoinPool.commonPool().submit(task).join()
70+
}
71+
72+
@Benchmark
73+
fun fjpTask(): Double {
74+
val task = Task(coefficients, 0, coefficients.size)
75+
return ForkJoinPool.commonPool().submit(task).join()
76+
}
77+
78+
suspend fun startAsync(coefficients: LongArray, start: Int, end: Int, dispatcher: CoroutineContext): Deferred<Double> = async(dispatcher) {
79+
if (end - start <= BATCH_SIZE) {
80+
compute(coefficients, start, end)
81+
} else {
82+
val first = startAsync(coefficients, start, start + (end - start) / 2, dispatcher)
83+
val second = startAsync(coefficients, start + (end - start) / 2, end, dispatcher)
84+
first.await() + second.await()
85+
}
86+
}
87+
88+
class Task(val coefficients: LongArray, val start: Int, val end: Int) : RecursiveTask<Double>() {
89+
override fun compute(): Double {
90+
if (end - start <= BATCH_SIZE) {
91+
return compute(coefficients, start, end)
92+
}
93+
94+
val first = Task(coefficients, start, start + (end - start) / 2).fork()
95+
val second = Task(coefficients, start + (end - start) / 2, end).fork()
96+
97+
var result = 0.0
98+
result += first.join()
99+
result += second.join()
100+
return result
101+
}
102+
103+
private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
104+
var result = 0.0
105+
for (i in start until end) {
106+
result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
107+
}
108+
109+
return result
110+
}
111+
}
112+
113+
class RecursiveAction(val coefficients: LongArray, val start: Int, val end: Int, @Volatile var result: Double = 0.0,
114+
parent: RecursiveAction? = null) : CountedCompleter<Double>(parent) {
115+
116+
private var first: ForkJoinTask<Double>? = null
117+
private var second: ForkJoinTask<Double>? = null
118+
119+
override fun getRawResult(): Double {
120+
return result
121+
}
122+
123+
override fun setRawResult(t: Double) {
124+
result = t
125+
}
126+
127+
override fun compute() {
128+
if (end - start <= BATCH_SIZE) {
129+
rawResult = compute(coefficients, start, end)
130+
} else {
131+
pendingCount = 2
132+
// One may fork only once here and executing second task here with looping over firstComplete to be even more efficient
133+
first = RecursiveAction(coefficients, start, start + (end - start) / 2, parent = this).fork()
134+
second = RecursiveAction(coefficients, start + (end - start) / 2, end, parent = this).fork()
135+
}
136+
137+
tryComplete()
138+
}
139+
140+
override fun onCompletion(caller: CountedCompleter<*>?) {
141+
if (caller !== this) {
142+
rawResult = first!!.rawResult + second!!.rawResult
143+
}
144+
super.onCompletion(caller)
145+
}
146+
}
147+
}
148+
149+
150+
private fun compute(coefficients: LongArray, start: Int, end: Int): Double {
151+
var result = 0.0
152+
for (i in start until end) {
153+
result += Math.sin(Math.pow(coefficients[i].toDouble(), 1.1)) + 1e-8
154+
}
155+
156+
return result
157+
}
158+
159+
160+
fun main(args: Array<String>) {
161+
// Hand-rollled test
162+
val arr = LongArray(BATCH_SIZE * 8) { ThreadLocalRandom.current().nextLong(0, 100) }
163+
164+
println(ForkJoinPool.commonPool().submit(ForkJoinBenchmark.Task(arr, 0, arr.size)).join())
165+
println(ForkJoinBenchmark.RecursiveAction(arr, 0, arr.size).invoke())
166+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package benchmarks
2+
3+
import kotlinx.coroutines.experimental.CommonPool
4+
import kotlinx.coroutines.experimental.ThreadPoolDispatcher
5+
import kotlinx.coroutines.experimental.newFixedThreadPoolContext
6+
import org.openjdk.jmh.annotations.Param
7+
import org.openjdk.jmh.annotations.Setup
8+
import org.openjdk.jmh.annotations.TearDown
9+
import java.io.Closeable
10+
import kotlin.coroutines.experimental.CoroutineContext
11+
12+
/**
13+
* Base class to use different [CoroutineContext] in benchmarks via [Param] in inheritors.
14+
* Currently allowed values are "fjp" for [CommonPool] and ftp_n for [ThreadPoolDispatcher] with n threads.
15+
*/
16+
abstract class ParametrizedDispatcherBase {
17+
18+
abstract var dispatcher: String
19+
lateinit var benchmarkContext: CoroutineContext // coroutinesContext clashes with scope parameter
20+
var closeable: Closeable? = null
21+
22+
@Setup
23+
open fun setup() {
24+
benchmarkContext = when {
25+
dispatcher == "fjp" -> CommonPool
26+
dispatcher.startsWith("ftp") -> {
27+
newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it }
28+
}
29+
else -> error("Unexpected dispatcher: $dispatcher")
30+
}
31+
}
32+
33+
@TearDown
34+
fun tearDown() {
35+
closeable?.close()
36+
}
37+
38+
}

benchmarks/src/jmh/kotlin/benchmarks/PingPongActorBenchmark.kt

Lines changed: 0 additions & 159 deletions
This file was deleted.

0 commit comments

Comments
 (0)