Skip to content

Commit 4fefbeb

Browse files
authored
Merge pull request #751 from sazzer/KTLN-752
KTLN-752: A Guide to RSocket with Kotlin
2 parents d147289 + f438222 commit 4fefbeb

File tree

10 files changed

+338
-0
lines changed

10 files changed

+338
-0
lines changed

kotlin-rsocket/pom.xml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
5+
http://maven.apache.org/xsd/maven-4.0.0.xsd">
6+
<modelVersion>4.0.0</modelVersion>
7+
<groupId>com.baeldung</groupId>
8+
<version>1.0.0-SNAPSHOT</version>
9+
<artifactId>kotlin-rsocket</artifactId>
10+
<parent>
11+
<groupId>com.baeldung</groupId>
12+
<artifactId>kotlin-modules</artifactId>
13+
<version>1.0.0-SNAPSHOT</version>
14+
</parent>
15+
16+
<properties>
17+
<ktor.version>2.3.7</ktor.version>
18+
<rsocket.version>0.15.4</rsocket.version>
19+
</properties>
20+
21+
<dependencies>
22+
<dependency>
23+
<groupId>io.rsocket.kotlin</groupId>
24+
<artifactId>rsocket-core</artifactId>
25+
<version>${rsocket.version}</version>
26+
</dependency>
27+
28+
<dependency>
29+
<groupId>io.ktor</groupId>
30+
<artifactId>ktor-server-netty-jvm</artifactId>
31+
<version>${ktor.version}</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>io.ktor</groupId>
35+
<artifactId>ktor-server-cio-jvm</artifactId>
36+
<version>${ktor.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>io.rsocket.kotlin</groupId>
40+
<artifactId>rsocket-ktor-server-jvm</artifactId>
41+
<version>${rsocket.version}</version>
42+
</dependency>
43+
44+
<dependency>
45+
<groupId>io.ktor</groupId>
46+
<artifactId>ktor-client-cio-jvm</artifactId>
47+
<version>${ktor.version}</version>
48+
</dependency>
49+
<dependency>
50+
<groupId>io.rsocket.kotlin</groupId>
51+
<artifactId>rsocket-ktor-client-jvm</artifactId>
52+
<version>${rsocket.version}</version>
53+
</dependency>
54+
55+
</dependencies>
56+
</project>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.baeldung.rsocket.fireforget
2+
3+
import io.ktor.client.*
4+
import io.rsocket.kotlin.RSocket
5+
import io.rsocket.kotlin.ktor.client.rSocket
6+
import io.rsocket.kotlin.payload.buildPayload
7+
import io.rsocket.kotlin.payload.data
8+
import kotlinx.coroutines.runBlocking
9+
10+
fun main() {
11+
runBlocking {
12+
val client = HttpClient {
13+
install(io.ktor.client.plugins.websocket.WebSockets)
14+
install(io.rsocket.kotlin.ktor.client.RSocketSupport)
15+
}
16+
17+
val rSocket: RSocket = client.rSocket(host = "localhost", port = 9000, path = "/rsocket/fireAndForget")
18+
rSocket.fireAndForget(buildPayload { data("Hello") })
19+
}
20+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.baeldung.rsocket.fireforget
2+
3+
import io.ktor.server.application.*
4+
import io.ktor.server.routing.*
5+
import io.ktor.server.engine.*
6+
import io.ktor.server.netty.*
7+
import io.rsocket.kotlin.RSocketRequestHandler
8+
import io.rsocket.kotlin.ktor.server.rSocket
9+
import io.rsocket.kotlin.payload.Payload
10+
11+
fun main() {
12+
embeddedServer(Netty, port = 9000) {
13+
install(io.ktor.server.websocket.WebSockets)
14+
install(io.rsocket.kotlin.ktor.server.RSocketSupport)
15+
routing {
16+
// Add RSocket endpoints
17+
18+
rSocket("/rsocket/fireAndForget") {
19+
RSocketRequestHandler {
20+
fireAndForget { request: Payload ->
21+
val text = request.data.readText()
22+
println("Received request: $text")
23+
}
24+
}
25+
}
26+
}
27+
}.start(wait = true)
28+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.baeldung.rsocket.requestchannel
2+
3+
import io.ktor.client.*
4+
import io.rsocket.kotlin.RSocket
5+
import io.rsocket.kotlin.emitOrClose
6+
import io.rsocket.kotlin.ktor.client.rSocket
7+
import io.rsocket.kotlin.payload.Payload
8+
import io.rsocket.kotlin.payload.buildPayload
9+
import io.rsocket.kotlin.payload.data
10+
import kotlinx.coroutines.flow.*
11+
import kotlinx.coroutines.runBlocking
12+
import kotlinx.coroutines.time.delay
13+
import java.time.Duration
14+
15+
fun main() {
16+
runBlocking {
17+
val client = HttpClient {
18+
install(io.ktor.client.plugins.websocket.WebSockets)
19+
install(io.rsocket.kotlin.ktor.client.RSocketSupport)
20+
}
21+
22+
val rSocket: RSocket = client.rSocket(port = 9000, path = "/rsocket/requestChannel")
23+
val stream = rSocket.requestChannel(buildPayload { data("Hello") }, flow { produceData() })
24+
25+
stream.onEach { frame ->
26+
val text = frame.data.readText()
27+
println("Received frame: $text")
28+
}.launchIn(this)
29+
}
30+
}
31+
32+
suspend fun FlowCollector<Payload>.produceData() {
33+
for (i in 0..10) {
34+
val data = "Request: $i"
35+
36+
println("Emitting $data")
37+
emitOrClose(buildPayload { data(data) })
38+
39+
delay(Duration.ofMillis(500))
40+
}
41+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.baeldung.rsocket.requestchannel
2+
3+
import io.ktor.server.application.*
4+
import io.ktor.server.routing.*
5+
import io.ktor.server.engine.*
6+
import io.ktor.server.netty.*
7+
import io.rsocket.kotlin.RSocketRequestHandler
8+
import io.rsocket.kotlin.emitOrClose
9+
import io.rsocket.kotlin.ktor.server.rSocket
10+
import io.rsocket.kotlin.payload.Payload
11+
import io.rsocket.kotlin.payload.buildPayload
12+
import io.rsocket.kotlin.payload.data
13+
import kotlinx.coroutines.CancellationException
14+
import kotlinx.coroutines.flow.*
15+
import kotlinx.coroutines.time.delay
16+
import java.time.Duration
17+
18+
fun main() {
19+
embeddedServer(Netty, port = 9000) {
20+
install(io.ktor.server.websocket.WebSockets)
21+
install(io.rsocket.kotlin.ktor.server.RSocketSupport)
22+
routing {
23+
// Add RSocket endpoints
24+
25+
rSocket("/rsocket/requestChannel") {
26+
RSocketRequestHandler {
27+
requestChannel { request: Payload, payloads: Flow<Payload> ->
28+
val text = request.data.readText()
29+
println("Received request: $text")
30+
31+
payloads.onEach { frame ->
32+
val payloadText = frame.data.readText()
33+
println("Received frame: $payloadText")
34+
}.launchIn(this)
35+
36+
flow {
37+
processData(text)
38+
}
39+
}
40+
}
41+
}
42+
}
43+
}.start(wait = true)
44+
}
45+
46+
suspend fun FlowCollector<Payload>.processData(text: String) {
47+
for (i in 0..10) {
48+
val data = "data: ($text)$i"
49+
50+
println("Emitting $data")
51+
emitOrClose(buildPayload { data(data) })
52+
53+
delay(Duration.ofMillis(500))
54+
}
55+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.baeldung.rsocket.requestresponse
2+
3+
import io.ktor.client.*
4+
import io.rsocket.kotlin.RSocket
5+
import io.rsocket.kotlin.ktor.client.rSocket
6+
import io.rsocket.kotlin.payload.buildPayload
7+
import io.rsocket.kotlin.payload.data
8+
import kotlinx.coroutines.runBlocking
9+
10+
fun main() {
11+
runBlocking {
12+
val client = HttpClient {
13+
install(io.ktor.client.plugins.websocket.WebSockets)
14+
install(io.rsocket.kotlin.ktor.client.RSocketSupport)
15+
}
16+
17+
val rSocket: RSocket = client.rSocket(port = 9000, path = "/rsocket/requestResponse")
18+
val response = rSocket.requestResponse(buildPayload { data("Hello") })
19+
20+
val text = response.data.readText()
21+
println("Received response: $text")
22+
}
23+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.baeldung.rsocket.requestresponse
2+
3+
import io.ktor.server.application.*
4+
import io.ktor.server.routing.*
5+
import io.ktor.server.engine.*
6+
import io.ktor.server.netty.*
7+
import io.rsocket.kotlin.RSocketRequestHandler
8+
import io.rsocket.kotlin.ktor.server.rSocket
9+
import io.rsocket.kotlin.payload.Payload
10+
import io.rsocket.kotlin.payload.buildPayload
11+
import io.rsocket.kotlin.payload.data
12+
import kotlinx.coroutines.time.delay
13+
import java.time.Duration
14+
15+
fun main() {
16+
embeddedServer(Netty, port = 9000) {
17+
install(io.ktor.server.websocket.WebSockets)
18+
install(io.rsocket.kotlin.ktor.server.RSocketSupport)
19+
routing {
20+
// Add RSocket endpoints
21+
22+
rSocket("/rsocket/requestResponse") {
23+
RSocketRequestHandler {
24+
requestResponse { request: Payload ->
25+
val text = request.data.readText()
26+
println("Received request: $text")
27+
28+
delay(Duration.ofSeconds(5))
29+
30+
buildPayload { data(text.reversed()) }
31+
}
32+
}
33+
}
34+
}
35+
}.start(wait = true)
36+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.baeldung.rsocket.requeststream
2+
3+
import io.ktor.client.*
4+
import io.rsocket.kotlin.RSocket
5+
import io.rsocket.kotlin.ktor.client.rSocket
6+
import io.rsocket.kotlin.payload.buildPayload
7+
import io.rsocket.kotlin.payload.data
8+
import kotlinx.coroutines.flow.*
9+
import kotlinx.coroutines.runBlocking
10+
11+
fun main() {
12+
runBlocking {
13+
val client = HttpClient {
14+
install(io.ktor.client.plugins.websocket.WebSockets)
15+
install(io.rsocket.kotlin.ktor.client.RSocketSupport)
16+
}
17+
18+
val rSocket: RSocket = client.rSocket(port = 9000, path = "/rsocket/requestStream")
19+
val stream = rSocket.requestStream(buildPayload { data("Hello") })
20+
21+
stream.onEach { frame ->
22+
val text = frame.data.readText()
23+
println("Received frame: $text")
24+
}.launchIn(this)
25+
}
26+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package com.baeldung.rsocket.requeststream
2+
3+
import io.ktor.server.application.*
4+
import io.ktor.server.routing.*
5+
import io.ktor.server.engine.*
6+
import io.ktor.server.netty.*
7+
import io.rsocket.kotlin.RSocketRequestHandler
8+
import io.rsocket.kotlin.emitOrClose
9+
import io.rsocket.kotlin.ktor.server.rSocket
10+
import io.rsocket.kotlin.payload.Payload
11+
import io.rsocket.kotlin.payload.buildPayload
12+
import io.rsocket.kotlin.payload.data
13+
import kotlinx.coroutines.CancellationException
14+
import kotlinx.coroutines.flow.FlowCollector
15+
import kotlinx.coroutines.flow.flow
16+
import kotlinx.coroutines.flow.onCompletion
17+
import kotlinx.coroutines.time.delay
18+
import java.time.Duration
19+
20+
fun main() {
21+
embeddedServer(Netty, port = 9000) {
22+
install(io.ktor.server.websocket.WebSockets)
23+
install(io.rsocket.kotlin.ktor.server.RSocketSupport)
24+
routing {
25+
// Add RSocket endpoints
26+
27+
rSocket("/rsocket/requestStream") {
28+
RSocketRequestHandler {
29+
requestStream { request: Payload ->
30+
val text = request.data.readText()
31+
println("Received request: $text")
32+
33+
flow {
34+
processData(text)
35+
}
36+
}
37+
}
38+
}
39+
}
40+
}.start(wait = true)
41+
}
42+
43+
suspend fun FlowCollector<Payload>.processData(text: String) {
44+
for (i in 0..10) {
45+
val data = "data: ($text)$i"
46+
47+
println("Emitting $data")
48+
emitOrClose(buildPayload { data(data) })
49+
50+
delay(Duration.ofMillis(500))
51+
}
52+
}

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,7 @@
410410
<module>kotlin-math-2</module>
411411
<module>kotlin-spark</module>
412412
<module>kotlin-prim</module>
413+
<module>kotlin-rsocket</module>
413414
</modules>
414415

415416
<profiles>

0 commit comments

Comments
 (0)