@@ -21,7 +21,6 @@ import io.reactivex.Single
2121import io.reactivex.disposables.CompositeDisposable
2222import io.reactivex.disposables.Disposable
2323import io.reactivex.processors.UnicastProcessor
24- import io.reactivex.schedulers.Schedulers
2524import io.rsocket.kotlin.*
2625import io.rsocket.kotlin.transport.netty.client.TcpClientTransport
2726import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable
@@ -33,6 +32,7 @@ import org.junit.Test
3332import org.reactivestreams.Publisher
3433import java.net.InetSocketAddress
3534import java.util.concurrent.TimeUnit
35+ import kotlin.math.max
3636
3737class InteractionsStressTest {
3838 private lateinit var server: NettyContextCloseable
@@ -75,10 +75,10 @@ class InteractionsStressTest {
7575 interaction(
7676 { payload -> payload.matches(" response" ) },
7777 {
78- it.flatMapSingle { num ->
78+ it.flatMapSingle( { num ->
7979 client.requestResponse(
8080 DefaultPayload .text(" response$num " ))
81- }
81+ }, false , interactionConcurrency)
8282 })
8383 }
8484
@@ -87,10 +87,10 @@ class InteractionsStressTest {
8787 interaction(
8888 { payload -> payload.matches(" stream" ) },
8989 {
90- it.flatMap { num ->
90+ it.flatMap ( { num ->
9191 client.requestStream(
9292 DefaultPayload .text(" stream$num " ))
93- }
93+ }, false , interactionConcurrency)
9494 })
9595 }
9696
@@ -99,10 +99,10 @@ class InteractionsStressTest {
9999 interaction(
100100 { payload -> payload.matches(" channel" ) },
101101 {
102- it.flatMap { num ->
102+ it.flatMap ( { num ->
103103 client.requestChannel(
104104 Flowable .just(DefaultPayload .text(" channel$num " )))
105- }
105+ }, false , interactionConcurrency)
106106 })
107107 }
108108
@@ -116,8 +116,8 @@ class InteractionsStressTest {
116116
117117 val errors = UnicastProcessor .create<Long >()
118118 val disposable = CompositeDisposable ()
119- repeat(threadsNum() ) {
120- disposable + = interaction(source().observeOn( Schedulers .io()) )
119+ repeat(interactionCount ) {
120+ disposable + = interaction(source())
121121 .subscribe({ res ->
122122 if (! pred(res)) {
123123 errors.onError(
@@ -142,8 +142,6 @@ class InteractionsStressTest {
142142
143143 }
144144
145- private fun threadsNum () = Runtime .getRuntime().availableProcessors()
146-
147145 internal class TestHandler
148146 : AbstractRSocket () {
149147
@@ -170,11 +168,15 @@ class InteractionsStressTest {
170168
171169 companion object {
172170 private fun source () =
173- Flowable .interval(intervalMillis, TimeUnit .MICROSECONDS )
171+ Flowable .interval(intervalMillis, TimeUnit .MILLISECONDS )
174172 .onBackpressureDrop()
175173
176- private const val intervalMillis: Long = 100
174+ private const val intervalMillis: Long = 1
177175
178176 private const val testDuration = 20L
177+
178+ private val interactionCount = max(1 , Runtime .getRuntime().availableProcessors() / 2 )
179+
180+ private const val interactionConcurrency = 4
179181 }
180182}
0 commit comments