Skip to content

Commit 68d449f

Browse files
author
olme04
committed
WIP: improve RSocket interface API
1 parent 88eb3db commit 68d449f

File tree

27 files changed

+352
-310
lines changed

27 files changed

+352
-310
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ val client = HttpClient(CIO) {
112112

113113
//optional acceptor for server requests
114114
acceptor {
115-
RSocketRequestHandler {
116-
requestResponse { it } //echo request payload
115+
RSocket {
116+
onRequestResponse { it } //echo request payload
117117
}
118118
}
119119
}
@@ -150,9 +150,9 @@ embeddedServer(CIO) {
150150
routing {
151151
//configure route `url:port/rsocket`
152152
rSocket("rsocket") {
153-
RSocketRequestHandler {
153+
RSocket {
154154
//handler for request/response
155-
requestResponse { request: Payload ->
155+
onRequestResponse { request: Payload ->
156156
//... some work here
157157
delay(500) // work emulation
158158
buildPayload {
@@ -161,7 +161,7 @@ embeddedServer(CIO) {
161161
}
162162
}
163163
//handler for request/stream
164-
requestStream { request: Payload ->
164+
onRequestStream { request: Payload ->
165165
flow {
166166
repeat(1000) { i ->
167167
emit(buildPayload { data("data: $i") })

benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,11 @@ import kotlinx.coroutines.*
2525
import kotlinx.coroutines.flow.*
2626
import kotlin.random.*
2727

28-
@OptIn(ExperimentalStreamsApi::class, DelicateCoroutinesApi::class)
2928
class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
3029
private val requestStrategy = FlowRequestStrategy.RequestBy(64, 0)
3130

3231
private val benchJob = Job()
33-
lateinit var client: RSocket
32+
lateinit var client: ConnectedRSocket
3433

3534
lateinit var payload: Payload
3635
lateinit var payloadsFlow: Flow<Payload>
@@ -41,16 +40,16 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
4140
payload = createPayload(payloadSize)
4241
payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } }
4342
val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) {
44-
RSocketRequestHandler {
45-
requestResponse {
43+
RSocket {
44+
onRequestResponse {
4645
it.close()
4746
payloadCopy()
4847
}
49-
requestStream {
48+
onRequestStream {
5049
it.close()
5150
payloadsFlow
5251
}
53-
requestChannel { init, payloads ->
52+
onRequestChannel { init, payloads ->
5453
init.close()
5554
payloads.requestWith(requestStrategy)
5655
}
@@ -63,7 +62,7 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
6362

6463
override fun cleanup() {
6564
runBlocking {
66-
client.coroutineContext.job.cancelAndJoin()
65+
client.session.coroutineContext.job.cancelAndJoin()
6766
benchJob.cancelAndJoin()
6867
}
6968
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/ConnectionAcceptor.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public fun interface ConnectionAcceptor {
2525

2626
public class ConnectionAcceptorContext internal constructor(
2727
public val config: ConnectionConfig,
28-
public val requester: RSocket,
28+
public val requester: ConnectedRSocket,
2929
)
3030

3131
public class ConnectionConfig internal constructor(

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt

Lines changed: 136 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,56 @@ package io.rsocket.kotlin
1818

1919
import io.ktor.utils.io.core.*
2020
import io.rsocket.kotlin.payload.*
21-
import kotlinx.coroutines.*
2221
import kotlinx.coroutines.flow.*
22+
import kotlin.coroutines.*
2323

24-
public interface RSocket : CoroutineScope {
24+
public sealed interface ConnectedRSocket : RSocket {
25+
public val session: RSocketSession
26+
}
2527

26-
public suspend fun metadataPush(metadata: ByteReadPacket) {
27-
metadata.close()
28-
notImplemented("Metadata Push")
29-
}
28+
public interface RSocket {
3029

31-
public suspend fun fireAndForget(payload: Payload) {
32-
payload.close()
33-
notImplemented("Fire and Forget")
34-
}
30+
public suspend fun metadataPush(metadata: ByteReadPacket): Unit =
31+
notImplementedMetadataPush(metadata)
3532

36-
public suspend fun requestResponse(payload: Payload): Payload {
37-
payload.close()
38-
notImplemented("Request Response")
39-
}
33+
public suspend fun fireAndForget(payload: Payload): Unit =
34+
notImplementedFireAndForget(payload)
4035

41-
public fun requestStream(payload: Payload): Flow<Payload> {
42-
payload.close()
43-
notImplemented("Request Stream")
44-
}
36+
public suspend fun requestResponse(payload: Payload): Payload =
37+
notImplementedRequestResponse(payload)
4538

46-
public fun requestChannel(initPayload: Payload, payloads: Flow<Payload>): Flow<Payload> {
47-
initPayload.close()
48-
notImplemented("Request Channel")
49-
}
39+
public fun requestStream(payload: Payload): Flow<Payload> =
40+
notImplementedRequestStream(payload)
41+
42+
public fun requestChannel(initPayload: Payload, payloads: Flow<Payload>): Flow<Payload> =
43+
notImplementedRequestChannel(initPayload, payloads)
44+
45+
}
46+
47+
public sealed interface RSocketBuilder {
48+
public fun onMetadataPush(block: suspend RSocket.(metadata: ByteReadPacket) -> Unit)
49+
public fun onFireAndForget(block: suspend RSocket.(payload: Payload) -> Unit)
50+
public fun onRequestResponse(block: suspend RSocket.(payload: Payload) -> Payload)
51+
public fun onRequestStream(block: suspend RSocket.(payload: Payload) -> Flow<Payload>)
52+
public fun onRequestChannel(block: suspend RSocket.(initPayload: Payload, payloads: Flow<Payload>) -> Flow<Payload>)
53+
}
54+
55+
public inline fun RSocket(block: RSocketBuilder.() -> Unit): RSocket = RSocketImpl().apply(block)
56+
57+
public inline fun RSocket.requestStream(crossinline payloadProvider: suspend () -> Payload): Flow<Payload> = flow {
58+
val payload = payloadProvider()
59+
val response = requestStream(payload)
60+
emitAll(response)
5061
}
5162

52-
private fun notImplemented(operation: String): Nothing = throw NotImplementedError("$operation is not implemented.")
63+
public inline fun RSocket.requestChannel(
64+
payloads: Flow<Payload>,
65+
crossinline initPayloadProvider: suspend () -> Payload,
66+
): Flow<Payload> = flow {
67+
val initPayload = initPayloadProvider()
68+
val response = requestChannel(initPayload, payloads)
69+
emitAll(response)
70+
}
5371

5472
/**
5573
* Tries to emit [value], if emit failed, f.e. due cancellation, calls [Closeable.close] on [value].
@@ -63,3 +81,98 @@ public suspend fun <C : Closeable> FlowCollector<C>.emitOrClose(value: C) {
6381
throw e
6482
}
6583
}
84+
85+
internal abstract class ConnectedRSocketImpl(
86+
final override val coroutineContext: CoroutineContext,
87+
) : ConnectedRSocket, RSocketSession {
88+
final override val session: RSocketSession get() = this
89+
}
90+
91+
internal object EmptyRSocket : RSocket
92+
93+
@PublishedApi
94+
internal class RSocketImpl : RSocketBuilder, RSocket {
95+
private var metadataPush: suspend RSocket.(metadata: ByteReadPacket) -> Unit =
96+
notImplementedMetadataPush
97+
private var fireAndForget: suspend RSocket.(payload: Payload) -> Unit =
98+
notImplementedFireAndForget
99+
private var requestResponse: suspend RSocket.(payload: Payload) -> Payload =
100+
notImplementedRequestResponse
101+
private var requestStream: RSocket.(payload: Payload) -> Flow<Payload> =
102+
notImplementedRequestStream
103+
private var requestChannel: RSocket.(initPayload: Payload, payloads: Flow<Payload>) -> Flow<Payload> =
104+
notImplementedRequestChannel
105+
106+
override suspend fun metadataPush(metadata: ByteReadPacket) =
107+
metadataPush.invoke(this, metadata)
108+
109+
override suspend fun fireAndForget(payload: Payload) =
110+
fireAndForget.invoke(this, payload)
111+
112+
override suspend fun requestResponse(payload: Payload): Payload =
113+
requestResponse.invoke(this, payload)
114+
115+
override fun requestStream(payload: Payload): Flow<Payload> =
116+
requestStream.invoke(this, payload)
117+
118+
override fun requestChannel(initPayload: Payload, payloads: Flow<Payload>): Flow<Payload> =
119+
requestChannel.invoke(this, initPayload, payloads)
120+
121+
override fun onMetadataPush(block: suspend RSocket.(metadata: ByteReadPacket) -> Unit) {
122+
check(metadataPush === notImplementedMetadataPush) { "Metadata Push handler already configured" }
123+
metadataPush = block
124+
}
125+
126+
override fun onFireAndForget(block: suspend RSocket.(payload: Payload) -> Unit) {
127+
check(fireAndForget === notImplementedFireAndForget) { "Fire and Forget handler already configured" }
128+
fireAndForget = block
129+
}
130+
131+
override fun onRequestResponse(block: suspend RSocket.(payload: Payload) -> Payload) {
132+
check(requestResponse === notImplementedRequestResponse) { "Request Response handler already configured" }
133+
requestResponse = block
134+
}
135+
136+
override fun onRequestStream(block: suspend RSocket.(payload: Payload) -> Flow<Payload>) {
137+
check(requestStream === notImplementedRequestStream) { "Request Stream handler already configured" }
138+
requestStream = { payload -> flow { emitAll(block(payload)) } }
139+
}
140+
141+
override fun onRequestChannel(block: suspend RSocket.(initPayload: Payload, payloads: Flow<Payload>) -> Flow<Payload>) {
142+
check(requestChannel === notImplementedRequestChannel) { "Request Channel handler already configured" }
143+
requestChannel = { initPayload, payloads -> flow { emitAll(block(initPayload, payloads)) } }
144+
}
145+
}
146+
147+
private inline fun notImplemented(operation: String): Nothing =
148+
throw NotImplementedError("$operation is not implemented.")
149+
150+
private val notImplementedMetadataPush: suspend RSocket.(metadata: ByteReadPacket) -> Unit =
151+
{ metadata ->
152+
metadata.close()
153+
notImplemented("Metadata Push")
154+
}
155+
156+
private val notImplementedFireAndForget: suspend RSocket.(payload: Payload) -> Unit =
157+
{ payload ->
158+
payload.close()
159+
notImplemented("Fire and Forget")
160+
}
161+
162+
private val notImplementedRequestResponse: suspend RSocket.(payload: Payload) -> Payload =
163+
{ payload ->
164+
payload.close()
165+
notImplemented("Request Response")
166+
}
167+
168+
private val notImplementedRequestStream: RSocket.(payload: Payload) -> Flow<Payload> =
169+
{ payload ->
170+
payload.close()
171+
notImplemented("Request Stream")
172+
}
173+
174+
private val notImplementedRequestChannel: RSocket.(initPayload: Payload, payloads: Flow<Payload>) -> Flow<Payload> =
175+
{ payload, _ ->
176+
payload.close()
177+
notImplemented("Request Channel")
178+
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocketError.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.rsocket.kotlin
1818

19+
import io.rsocket.kotlin.frame.*
20+
1921
public sealed class RSocketError(public val errorCode: Int, message: String) : Throwable(message) {
2022

2123
public sealed class Setup(errorCode: Int, message: String) : RSocketError(errorCode, message) {
@@ -53,8 +55,13 @@ public sealed class RSocketError(public val errorCode: Int, message: String) : T
5355
}
5456

5557
@Suppress("FunctionName") // function name intentionally starts with an uppercase letter
56-
internal fun RSocketError(streamId: Int, errorCode: Int, message: String): Throwable =
57-
when (streamId) {
58+
internal fun RSocketError(
59+
frame: ErrorFrame,
60+
): Throwable {
61+
val streamId = frame.streamId
62+
val errorCode = frame.errorCode
63+
val message = frame.data.readText()
64+
return when (streamId) {
5865
0 -> when (errorCode) {
5966
ErrorCode.InvalidSetup -> RSocketError.Setup.Invalid(message)
6067
ErrorCode.UnsupportedSetup -> RSocketError.Setup.Unsupported(message)
@@ -75,6 +82,7 @@ internal fun RSocketError(streamId: Int, errorCode: Int, message: String): Throw
7582
}
7683
}
7784
}
85+
}
7886

7987
internal object ErrorCode {
8088

0 commit comments

Comments
 (0)