11/*
2- * Copyright 2015-2022 the original author or authors.
2+ * Copyright 2015-2024 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1414 * limitations under the License.
1515 */
1616
17- package io.rsocket.kotlin.benchmarks
17+ package io.rsocket.kotlin.benchmarks.java
1818
1919import io.rsocket.*
2020import io.rsocket.core.*
2121import io.rsocket.frame.decoder.*
22- import io.rsocket.transport.local.*
22+ import io.rsocket.kotlin.benchmarks.*
23+ import io.rsocket.transport.*
2324import io.rsocket.util.*
25+ import kotlinx.benchmark.*
2426import kotlinx.coroutines.flow.*
2527import kotlinx.coroutines.reactive.*
2628import org.reactivestreams.*
2729import reactor.core.publisher.*
2830import kotlin.random.*
2931
30- class RSocketJavaBenchmark : RSocketBenchmark <Payload >() {
32+ @BenchmarkMode(Mode .Throughput )
33+ @Warmup(iterations = WARMUP , time = WARMUP_DURATION )
34+ @Measurement(iterations = ITERATION , time = ITERATION_DURATION )
35+ @State(Scope .Benchmark )
36+ abstract class RSocketJavaBenchmark : RSocketBenchmark <Payload , Blackhole >() {
37+ protected abstract val clientTransport: ClientTransport
38+ protected abstract val serverTransport: ServerTransport <* >
39+
40+ private lateinit var payload: Payload
41+ private lateinit var payloadMono: Mono <Payload >
42+ private lateinit var payloadsFlux: Flux <Payload >
43+ private lateinit var payloadsFlow: Flow <Payload >
44+ private lateinit var client: RSocket
45+ private lateinit var server: Closeable
3146
32- lateinit var client: RSocket
33- lateinit var server: Closeable
47+ override fun createPayload (size : Int ): Payload = if (size == 0 ) EmptyPayload .INSTANCE else ByteBufPayload .create(
48+ ByteArray (size / 2 ).also { Random .nextBytes(it) },
49+ ByteArray (size / 2 ).also { Random .nextBytes(it) }
50+ )
51+
52+ override fun createPayloadCopy (): Payload = payload.retain()
53+
54+ override fun releasePayload (payload : Payload ) {
55+ payload.release()
56+ }
3457
35- lateinit var payload: Payload
36- lateinit var payloadMono: Mono <Payload >
37- lateinit var payloadsFlux: Flux <Payload >
38- lateinit var payloadsFlow: Flow <Payload >
58+ override fun consumePayload (bh : Blackhole , value : Payload ) = bh.consume(value)
3959
60+ override suspend fun doRequestResponse (): Payload = client.requestResponse(payload.retain()).awaitSingle()
61+ override fun doRequestStream (): Flow <Payload > = client.requestStream(payload.retain()).asFlow()
62+ override fun doRequestChannel (): Flow <Payload > = client.requestChannel(payloadsFlow.asPublisher()).asFlow()
63+
64+ @Setup
4065 override fun setup () {
4166 payload = createPayload(payloadSize)
42-
4367 payloadMono = Mono .fromSupplier(payload::retain)
4468 payloadsFlux = Flux .range(0 , 5000 ).map { payload.retain() }
4569 payloadsFlow = flow { repeat(5000 ) { emit(payload.retain()) } }
@@ -61,33 +85,48 @@ class RSocketJavaBenchmark : RSocketBenchmark<Payload>() {
6185 })
6286 }
6387 .payloadDecoder(PayloadDecoder .ZERO_COPY )
64- .bind(LocalServerTransport .create( " server " ) )
88+ .bind(serverTransport )
6589 .block()!!
6690
6791 client = RSocketConnector .create()
6892 .payloadDecoder(PayloadDecoder .ZERO_COPY )
69- .connect(LocalClientTransport .create( " server " ) )
93+ .connect(clientTransport )
7094 .block()!!
7195 }
7296
97+ @TearDown
7398 override fun cleanup () {
7499 client.dispose()
75100 server.dispose()
76101 }
77102
78- override fun createPayload (size : Int ): Payload = if (size == 0 ) EmptyPayload .INSTANCE else ByteBufPayload .create(
79- ByteArray (size / 2 ).also { Random .nextBytes(it) },
80- ByteArray (size / 2 ).also { Random .nextBytes(it) }
81- )
103+ @Param(" 0" )
104+ override var payloadSize: Int = 0
82105
83- override fun releasePayload (payload : Payload ) {
84- payload.release()
85- }
106+ @Benchmark
107+ override fun requestResponseBlocking (bh : Blackhole ) = super .requestResponseBlocking(bh)
86108
87- override suspend fun doRequestResponse (): Payload = client.requestResponse(payload.retain()).awaitSingle()
109+ @Benchmark
110+ override fun requestResponseParallel (bh : Blackhole ) = super .requestResponseParallel(bh)
111+
112+ @Benchmark
113+ override fun requestResponseConcurrent (bh : Blackhole ) = super .requestResponseConcurrent(bh)
114+
115+ @Benchmark
116+ override fun requestStreamBlocking (bh : Blackhole ) = super .requestStreamBlocking(bh)
117+
118+ @Benchmark
119+ override fun requestStreamParallel (bh : Blackhole ) = super .requestStreamParallel(bh)
120+
121+ @Benchmark
122+ override fun requestStreamConcurrent (bh : Blackhole ) = super .requestStreamConcurrent(bh)
88123
89- override suspend fun doRequestStream (): Flow <Payload > = client.requestStream(payload.retain()).asFlow()
124+ @Benchmark
125+ override fun requestChannelBlocking (bh : Blackhole ) = super .requestChannelBlocking(bh)
90126
91- override suspend fun doRequestChannel (): Flow <Payload > = client.requestChannel(payloadsFlow.asPublisher()).asFlow()
127+ @Benchmark
128+ override fun requestChannelParallel (bh : Blackhole ) = super .requestChannelParallel(bh)
92129
130+ @Benchmark
131+ override fun requestChannelConcurrent (bh : Blackhole ) = super .requestChannelConcurrent(bh)
93132}
0 commit comments