11/*
2- * Copyright 2015-2022 the original author or authors.
2+ * Copyright 2015-2025 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1414 * limitations under the License.
1515 */
1616
17- package io.rsocket.kotlin.benchmarks
17+ package io.rsocket.kotlin.benchmarks.java
1818
1919import io.rsocket.*
2020import io.rsocket.core.*
2121import io.rsocket.frame.decoder.*
22- import io.rsocket.transport.local.*
22+ import io.rsocket.kotlin.benchmarks.*
23+ import io.rsocket.transport.*
2324import io.rsocket.util.*
25+ import kotlinx.benchmark.*
26+ import kotlinx.benchmark.Mode
2427import kotlinx.coroutines.flow.*
2528import kotlinx.coroutines.reactive.*
2629import org.reactivestreams.*
2730import reactor.core.publisher.*
2831import kotlin.random.*
2932
30- class RSocketJavaBenchmark : RSocketBenchmark <Payload >() {
33+ @BenchmarkMode(Mode .Throughput )
34+ @Warmup(iterations = WARMUP , time = WARMUP_DURATION )
35+ @Measurement(iterations = ITERATION , time = ITERATION_DURATION )
36+ @State(Scope .Benchmark )
37+ abstract class RSocketJavaBenchmark : RSocketBenchmark <Payload , Blackhole >() {
38+ protected abstract val clientTransport: ClientTransport
39+ protected abstract val serverTransport: ServerTransport <* >
40+
41+ private lateinit var payload: Payload
42+ private lateinit var payloadMono: Mono <Payload >
43+ private lateinit var payloadsFlux: Flux <Payload >
44+ private lateinit var payloadsFlow: Flow <Payload >
45+ private lateinit var client: RSocket
46+ private lateinit var server: Closeable
3147
32- lateinit var client: RSocket
33- lateinit var server: Closeable
48+ override fun createPayload (size : Int ): Payload = if (size == 0 ) EmptyPayload .INSTANCE else ByteBufPayload .create(
49+ ByteArray (size / 2 ).also { Random .nextBytes(it) },
50+ ByteArray (size / 2 ).also { Random .nextBytes(it) }
51+ )
52+
53+ override fun createPayloadCopy (): Payload = payload.retain()
54+
55+ override fun releasePayload (payload : Payload ) {
56+ payload.release()
57+ }
3458
35- lateinit var payload: Payload
36- lateinit var payloadMono: Mono <Payload >
37- lateinit var payloadsFlux: Flux <Payload >
38- lateinit var payloadsFlow: Flow <Payload >
59+ override fun consumePayload (bh : Blackhole , value : Payload ) = bh.consume(value)
3960
61+ override suspend fun doRequestResponse (): Payload = client.requestResponse(payload.retain()).awaitSingle()
62+ override fun doRequestStream (): Flow <Payload > = client.requestStream(payload.retain()).asFlow()
63+ override fun doRequestChannel (): Flow <Payload > = client.requestChannel(payloadsFlow.asPublisher()).asFlow()
64+
65+ @Setup
4066 override fun setup () {
4167 payload = createPayload(payloadSize)
42-
4368 payloadMono = Mono .fromSupplier(payload::retain)
4469 payloadsFlux = Flux .range(0 , 5000 ).map { payload.retain() }
4570 payloadsFlow = flow { repeat(5000 ) { emit(payload.retain()) } }
@@ -61,33 +86,48 @@ class RSocketJavaBenchmark : RSocketBenchmark<Payload>() {
6186 })
6287 }
6388 .payloadDecoder(PayloadDecoder .ZERO_COPY )
64- .bind(LocalServerTransport .create( " server " ) )
89+ .bind(serverTransport )
6590 .block()!!
6691
6792 client = RSocketConnector .create()
6893 .payloadDecoder(PayloadDecoder .ZERO_COPY )
69- .connect(LocalClientTransport .create( " server " ) )
94+ .connect(clientTransport )
7095 .block()!!
7196 }
7297
98+ @TearDown
7399 override fun cleanup () {
74100 client.dispose()
75101 server.dispose()
76102 }
77103
78- override fun createPayload (size : Int ): Payload = if (size == 0 ) EmptyPayload .INSTANCE else ByteBufPayload .create(
79- ByteArray (size / 2 ).also { Random .nextBytes(it) },
80- ByteArray (size / 2 ).also { Random .nextBytes(it) }
81- )
104+ @Param(" 0" )
105+ override var payloadSize: Int = 0
82106
83- override fun releasePayload (payload : Payload ) {
84- payload.release()
85- }
107+ @Benchmark
108+ override fun requestResponseBlocking (bh : Blackhole ) = super .requestResponseBlocking(bh)
86109
87- override suspend fun doRequestResponse (): Payload = client.requestResponse(payload.retain()).awaitSingle()
110+ @Benchmark
111+ override fun requestResponseParallel (bh : Blackhole ) = super .requestResponseParallel(bh)
112+
113+ @Benchmark
114+ override fun requestResponseConcurrent (bh : Blackhole ) = super .requestResponseConcurrent(bh)
115+
116+ @Benchmark
117+ override fun requestStreamBlocking (bh : Blackhole ) = super .requestStreamBlocking(bh)
118+
119+ @Benchmark
120+ override fun requestStreamParallel (bh : Blackhole ) = super .requestStreamParallel(bh)
121+
122+ @Benchmark
123+ override fun requestStreamConcurrent (bh : Blackhole ) = super .requestStreamConcurrent(bh)
88124
89- override suspend fun doRequestStream (): Flow <Payload > = client.requestStream(payload.retain()).asFlow()
125+ @Benchmark
126+ override fun requestChannelBlocking (bh : Blackhole ) = super .requestChannelBlocking(bh)
90127
91- override suspend fun doRequestChannel (): Flow <Payload > = client.requestChannel(payloadsFlow.asPublisher()).asFlow()
128+ @Benchmark
129+ override fun requestChannelParallel (bh : Blackhole ) = super .requestChannelParallel(bh)
92130
131+ @Benchmark
132+ override fun requestChannelConcurrent (bh : Blackhole ) = super .requestChannelConcurrent(bh)
93133}
0 commit comments