Skip to content

Commit 1117850

Browse files
committed
Ping-Pong (two-actor) benchmark (including Akka)
1 parent c18271c commit 1117850

File tree

3 files changed

+180
-7
lines changed

3 files changed

+180
-7
lines changed

benchmarks/pom.xml

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,25 @@
3131
<version>0.14</version>
3232
<packaging>jar</packaging>
3333

34+
<properties>
35+
<jmh.version>1.18</jmh.version>
36+
<javac.target>1.8</javac.target>
37+
<jmh.generator>default</jmh.generator>
38+
<uberjar.name>benchmarks</uberjar.name>
39+
<akka.version>2.0.2</akka.version>
40+
</properties>
41+
3442
<dependencies>
3543
<dependency>
3644
<groupId>org.openjdk.jmh</groupId>
3745
<artifactId>jmh-core</artifactId>
3846
<version>${jmh.version}</version>
3947
</dependency>
48+
<dependency>
49+
<groupId>com.typesafe.akka</groupId>
50+
<artifactId>akka-actor</artifactId>
51+
<version>${akka.version}</version>
52+
</dependency>
4053
<dependency>
4154
<groupId>org.jetbrains.kotlinx</groupId>
4255
<artifactId>kotlinx-coroutines-core</artifactId>
@@ -50,12 +63,13 @@
5063
</dependency>
5164
</dependencies>
5265

53-
<properties>
54-
<jmh.version>1.18</jmh.version>
55-
<javac.target>1.8</javac.target>
56-
<jmh.generator>default</jmh.generator>
57-
<uberjar.name>benchmarks</uberjar.name>
58-
</properties>
66+
<repositories>
67+
<repository>
68+
<id>typesafe</id>
69+
<name>Typesafe Repository</name>
70+
<url>http://repo.typesafe.com/typesafe/releases/</url>
71+
</repository>
72+
</repositories>
5973

6074
<build>
6175
<plugins>

benchmarks/src/main/kotlin/benchmarks/GuideSyncBenchmark.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ GuideSyncBenchmark.sync07Actor avgt 15 1075603.512 ± 203901.350 us
5050
*/
5151

5252
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
53-
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
53+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
5454
@Fork(value = 3)
5555
@BenchmarkMode(Mode.AverageTime)
5656
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package benchmarks
2+
3+
import akka.actor.*
4+
import kotlinx.coroutines.experimental.CommonPool
5+
import kotlinx.coroutines.experimental.channels.Channel
6+
import kotlinx.coroutines.experimental.channels.SendChannel
7+
import kotlinx.coroutines.experimental.channels.actor
8+
import kotlinx.coroutines.experimental.newFixedThreadPoolContext
9+
import kotlinx.coroutines.experimental.newSingleThreadContext
10+
import kotlinx.coroutines.experimental.runBlocking
11+
import org.openjdk.jmh.annotations.*
12+
import org.openjdk.jmh.annotations.Scope
13+
import java.util.concurrent.TimeUnit
14+
import kotlin.coroutines.experimental.CoroutineContext
15+
16+
/*
17+
18+
Benchmark Mode Cnt Score Error Units
19+
PingPongActorBenchmark.pingPongAkka avgt 15 439.419 ± 24.595 ms/op
20+
PingPongActorBenchmark.pingPongCoroutineCommonPool avgt 15 809.122 ± 18.102 ms/op
21+
PingPongActorBenchmark.pingPongCoroutineMain avgt 15 360.072 ± 4.930 ms/op
22+
PingPongActorBenchmark.pingPongCoroutineSingleThread avgt 15 368.429 ± 3.718 ms/op
23+
PingPongActorBenchmark.pingPongCoroutineTwoThreads avgt 15 615.514 ± 5.292 ms/op
24+
25+
*/
26+
27+
private const val N_MESSAGES = 1_000_000
28+
29+
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
30+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
31+
@Fork(value = 3)
32+
@BenchmarkMode(Mode.AverageTime)
33+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
34+
@State(Scope.Benchmark)
35+
open class PingPongActorBenchmark {
36+
data class Ball(val count: Int)
37+
class Start
38+
class Stop
39+
40+
class PingActorAkka(val pongRef: ActorRef) : UntypedActor() {
41+
override fun onReceive(msg: Any?) {
42+
when (msg) {
43+
is Start -> {
44+
pongRef.tell(Ball(0), self)
45+
}
46+
is Ball -> {
47+
pongRef.tell(Ball(count = msg.count + 1), self)
48+
}
49+
is Stop -> {
50+
context.system().shutdown()
51+
}
52+
else -> unhandled(msg)
53+
}
54+
}
55+
}
56+
57+
class PongActorAkka() : UntypedActor() {
58+
override fun onReceive(msg: Any?) {
59+
when (msg) {
60+
is Ball -> {
61+
if (msg.count >= N_MESSAGES) {
62+
sender.tell(Stop())
63+
context.stop(self)
64+
} else {
65+
sender.tell(Ball(msg.count + 1))
66+
}
67+
}
68+
else -> unhandled(msg)
69+
}
70+
}
71+
}
72+
73+
@Benchmark
74+
fun pingPongAkka() {
75+
val system = ActorSystem.create("PingPoing")
76+
val pongRef = system.actorOf(Props(UntypedActorFactory { PongActorAkka() }), "pong")
77+
val pingRef = system.actorOf(Props(UntypedActorFactory { PingActorAkka(pongRef) }), "ping")
78+
pingRef.tell(Start())
79+
system.awaitTermination()
80+
}
81+
82+
data class Letter(val msg: Any?, val sender: SendChannel<Letter>)
83+
84+
fun pingActorCoroutine(context: CoroutineContext, pingChannel: SendChannel<Letter>) = actor<Letter>(context) {
85+
var initiator: SendChannel<Letter>? = null
86+
for (letter in channel) with(letter) {
87+
when (msg) {
88+
is Start -> {
89+
initiator = sender
90+
pingChannel.send(Letter(Ball(0), channel))
91+
}
92+
is Ball -> {
93+
pingChannel.send(Letter(Ball(msg.count + 1), channel))
94+
}
95+
is Stop -> {
96+
initiator!!.send(Letter(Stop(), channel))
97+
return@actor
98+
}
99+
else -> error("Cannot happen $msg")
100+
}
101+
}
102+
}
103+
104+
fun pongActorCoroutine(context: CoroutineContext) = actor<Letter>(context) {
105+
for (letter in channel) with (letter) {
106+
when (msg) {
107+
is Ball -> {
108+
if (msg.count >= N_MESSAGES) {
109+
sender.send(Letter(Stop(), channel))
110+
return@actor
111+
} else {
112+
sender.send(Letter(Ball(msg.count + 1), channel))
113+
}
114+
}
115+
else -> error("Cannot happen $msg")
116+
}
117+
}
118+
}
119+
120+
@Benchmark
121+
fun pingPongCoroutineCommonPool() = runBlocking<Unit> {
122+
val pong = pongActorCoroutine(CommonPool)
123+
val ping = pingActorCoroutine(CommonPool, pong)
124+
val me = Channel<Letter>()
125+
ping.send(Letter(Start(), me))
126+
me.receive()
127+
}
128+
129+
@Benchmark
130+
fun pingPongCoroutineMain() = runBlocking<Unit> {
131+
val pong = pongActorCoroutine(context)
132+
val ping = pingActorCoroutine(context, pong)
133+
val me = Channel<Letter>()
134+
ping.send(Letter(Start(), me))
135+
me.receive()
136+
}
137+
138+
val singleThread = newSingleThreadContext("PingPongThread")
139+
140+
@Benchmark
141+
fun pingPongCoroutineSingleThread() = runBlocking<Unit> {
142+
val pong = pongActorCoroutine(singleThread)
143+
val ping = pingActorCoroutine(singleThread, pong)
144+
val me = Channel<Letter>()
145+
ping.send(Letter(Start(), me))
146+
me.receive()
147+
}
148+
149+
val twoThreads = newFixedThreadPoolContext(2, "PingPongThreads")
150+
151+
@Benchmark
152+
fun pingPongCoroutineTwoThreads() = runBlocking<Unit> {
153+
val pong = pongActorCoroutine(twoThreads)
154+
val ping = pingActorCoroutine(twoThreads, pong)
155+
val me = Channel<Letter>()
156+
ping.send(Letter(Start(), me))
157+
me.receive()
158+
}
159+
}

0 commit comments

Comments
 (0)