Skip to content

Commit 0cbc7d4

Browse files
committed
wip netty ws
1 parent f55552b commit 0cbc7d4

File tree

5 files changed

+72
-87
lines changed

5 files changed

+72
-87
lines changed

rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketClientTransport.kt

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,15 +35,13 @@ import kotlin.reflect.*
3535

3636
@OptIn(RSocketTransportApi::class)
3737
public sealed interface NettyWebSocketClientTransport : RSocketTransport {
38-
public fun target(configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget
39-
public fun target(uri: URI, configure: WebSocketClientProtocolConfig.Builder.() -> Unit = {}): RSocketClientTarget
40-
public fun target(urlString: String, configure: WebSocketClientProtocolConfig.Builder.() -> Unit = {}): RSocketClientTarget
38+
public fun target(uri: URI): RSocketClientTarget
39+
public fun target(urlString: String): RSocketClientTarget
4140

4241
public fun target(
4342
host: String? = null,
4443
port: Int? = null,
4544
path: String? = null,
46-
configure: WebSocketClientProtocolConfig.Builder.() -> Unit = {},
4745
): RSocketClientTarget
4846

4947
public companion object Factory :
@@ -111,8 +109,9 @@ private class NettyWebSocketClientTransportBuilderImpl : NettyWebSocketClientTra
111109
sslContext = sslContext,
112110
bootstrap = bootstrap,
113111
webSocketProtocolConfig = webSocketProtocolConfig,
114-
manageBootstrap = manageEventLoopGroup
115-
)
112+
).also {
113+
if (manageEventLoopGroup) it.shutdownOnCancellation(bootstrap.config().group())
114+
}
116115
}
117116
}
118117

@@ -121,15 +120,8 @@ private class NettyWebSocketClientTransportImpl(
121120
private val sslContext: SslContext?,
122121
private val bootstrap: Bootstrap,
123122
private val webSocketProtocolConfig: (WebSocketClientProtocolConfig.Builder.() -> Unit)?,
124-
manageBootstrap: Boolean,
125123
) : NettyWebSocketClientTransport {
126-
init {
127-
if (manageBootstrap) callOnCancellation {
128-
bootstrap.config().group().shutdownGracefully().awaitFuture()
129-
}
130-
}
131-
132-
override fun target(configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget {
124+
private fun target(configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget {
133125
val webSocketProtocolConfig = WebSocketClientProtocolConfig.newBuilder().apply {
134126
// transport config first
135127
webSocketProtocolConfig?.invoke(this)
@@ -148,19 +140,18 @@ private class NettyWebSocketClientTransportImpl(
148140
)
149141
}
150142

151-
override fun target(uri: URI, configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget = target {
143+
override fun target(uri: URI): RSocketClientTarget = target {
152144
webSocketUri(uri)
153145
}
154146

155-
override fun target(urlString: String, configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget = target {
147+
override fun target(urlString: String): RSocketClientTarget = target {
156148
webSocketUri(urlString)
157149
}
158150

159151
override fun target(
160152
host: String?,
161153
port: Int?,
162154
path: String?,
163-
configure: WebSocketClientProtocolConfig.Builder.() -> Unit,
164155
): RSocketClientTarget = target {
165156
webSocketUri(
166157
URI(
@@ -184,9 +175,11 @@ private class NettyWebSocketClientTransportTargetImpl(
184175
private val webSocketProtocolConfig: WebSocketClientProtocolConfig,
185176
private val remoteAddress: InetSocketAddress,
186177
) : RSocketClientTarget {
187-
188178
@RSocketTransportApi
189-
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
179+
override suspend fun connectClient(): RSocketConnection {
180+
currentCoroutineContext().ensureActive()
181+
coroutineContext.ensureActive()
182+
190183
bootstrap.clone().handler(
191184
NettyWebSocketClientConnectionInitializer(
192185
sslContext = sslContext,
Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,15 +18,33 @@ package io.rsocket.kotlin.transport.netty.websocket
1818

1919
import io.netty.channel.*
2020
import io.netty.channel.socket.*
21+
import io.netty.handler.codec.http.*
2122
import io.netty.handler.codec.http.websocketx.*
23+
import io.netty.handler.ssl.*
2224
import io.rsocket.kotlin.internal.io.*
2325
import io.rsocket.kotlin.transport.*
2426
import io.rsocket.kotlin.transport.internal.*
2527
import io.rsocket.kotlin.transport.netty.internal.*
2628
import kotlinx.coroutines.*
2729
import kotlinx.coroutines.channels.*
28-
import kotlinx.coroutines.channels.Channel
2930
import kotlinx.io.*
31+
import java.net.*
32+
import kotlin.coroutines.*
33+
34+
@RSocketTransportApi
35+
internal class NettyWebSocketConnection(
36+
private val outboundQueue: PrioritizationFrameQueue,
37+
private val inbound: ReceiveChannel<Buffer>,
38+
) : RSocketSequentialConnection {
39+
override val isClosedForSend: Boolean get() = outboundQueue.isClosedForSend
40+
override suspend fun sendFrame(streamId: Int, frame: Buffer) {
41+
return outboundQueue.enqueueFrame(streamId, frame)
42+
}
43+
44+
override suspend fun receiveFrame(): Buffer? {
45+
return inbound.receiveCatching().getOrNull()
46+
}
47+
}
3048

3149
@RSocketTransportApi
3250
internal class NettyWebSocketConnectionHandler(
@@ -126,16 +144,31 @@ private class NettyWebSocketConnectionInboundHandler(
126144
}
127145

128146
@RSocketTransportApi
129-
private class NettyWebSocketConnection(
130-
private val outboundQueue: PrioritizationFrameQueue,
131-
private val inbound: ReceiveChannel<Buffer>,
132-
) : RSocketSequentialConnection {
133-
override val isClosedForSend: Boolean get() = outboundQueue.isClosedForSend
134-
override suspend fun sendFrame(streamId: Int, frame: Buffer) {
135-
return outboundQueue.enqueueFrame(streamId, frame)
136-
}
137-
138-
override suspend fun receiveFrame(): Buffer? {
139-
return inbound.receiveCatching().getOrNull()
147+
internal abstract class NettyWebSocketConnectionInitializer(
148+
private val sslContext: SslContext?,
149+
private val remoteAddress: InetSocketAddress?,
150+
private val handler: RSocketConnectionHandler,
151+
final override val coroutineContext: CoroutineContext,
152+
) : ChannelInitializer<DuplexChannel>(), CoroutineScope {
153+
protected abstract fun createHttpHandler(): ChannelHandler
154+
protected abstract fun createWebSocketHandler(): ChannelHandler
155+
156+
final override fun initChannel(channel: DuplexChannel): Unit = with(channel.pipeline()) {
157+
//addLast(LoggingHandler(if (remoteAddress == null) "server" else "client"))
158+
if (sslContext != null) {
159+
addLast(
160+
"ssl",
161+
when {
162+
remoteAddress != null -> sslContext.newHandler(channel.alloc(), remoteAddress.hostName, remoteAddress.port)
163+
else -> sslContext.newHandler(channel.alloc())
164+
}
165+
)
166+
}
167+
// TODO: should those handlers be configurable?
168+
// what is the the good defaults here and for HttpObjectAggregator
169+
addLast("http", createHttpHandler())
170+
addLast(HttpObjectAggregator(65536))
171+
addLast("websocket", createWebSocketHandler())
172+
addLast("rsocket", NettyWebSocketConnectionHandler(channel, handler, this@NettyWebSocketConnectionInitializer))
140173
}
141174
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,41 +16,3 @@
1616

1717
package io.rsocket.kotlin.transport.netty.websocket
1818

19-
import io.netty.channel.*
20-
import io.netty.channel.socket.*
21-
import io.netty.handler.codec.http.*
22-
import io.netty.handler.ssl.*
23-
import io.rsocket.kotlin.transport.*
24-
import kotlinx.coroutines.*
25-
import java.net.*
26-
import kotlin.coroutines.*
27-
28-
@RSocketTransportApi
29-
internal abstract class NettyWebSocketConnectionInitializer(
30-
private val sslContext: SslContext?,
31-
private val remoteAddress: InetSocketAddress?,
32-
private val handler: RSocketConnectionHandler,
33-
final override val coroutineContext: CoroutineContext,
34-
) : ChannelInitializer<DuplexChannel>(), CoroutineScope {
35-
protected abstract fun createHttpHandler(): ChannelHandler
36-
protected abstract fun createWebSocketHandler(): ChannelHandler
37-
38-
final override fun initChannel(channel: DuplexChannel): Unit = with(channel.pipeline()) {
39-
//addLast(LoggingHandler(if (remoteAddress == null) "server" else "client"))
40-
if (sslContext != null) {
41-
addLast(
42-
"ssl",
43-
when {
44-
remoteAddress != null -> sslContext.newHandler(channel.alloc(), remoteAddress.hostName, remoteAddress.port)
45-
else -> sslContext.newHandler(channel.alloc())
46-
}
47-
)
48-
}
49-
// TODO: should those handlers be configurable?
50-
// what is the the good defaults here and for HttpObjectAggregator
51-
addLast("http", createHttpHandler())
52-
addLast(HttpObjectAggregator(65536))
53-
addLast("websocket", createWebSocketHandler())
54-
addLast("rsocket", NettyWebSocketConnectionHandler(channel, handler, this@NettyWebSocketConnectionInitializer))
55-
}
56-
}

rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketServerTransport.kt

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -132,8 +132,9 @@ private class NettyWebSocketServerTransportBuilderImpl : NettyWebSocketServerTra
132132
bootstrap = bootstrap,
133133
sslContext = sslContext,
134134
webSocketProtocolConfig = webSocketProtocolConfig,
135-
manageBootstrap = manageEventLoopGroup
136-
)
135+
).also {
136+
if (manageEventLoopGroup) it.shutdownOnCancellation(bootstrap.config().childGroup(), bootstrap.config().group())
137+
}
137138
}
138139
}
139140

@@ -142,15 +143,7 @@ private class NettyWebSocketServerTransportImpl(
142143
private val bootstrap: ServerBootstrap,
143144
private val sslContext: SslContext?,
144145
private val webSocketProtocolConfig: (WebSocketServerProtocolConfig.Builder.() -> Unit)?,
145-
manageBootstrap: Boolean,
146146
) : NettyWebSocketServerTransport {
147-
init {
148-
if (manageBootstrap) callOnCancellation {
149-
bootstrap.config().childGroup().shutdownGracefully().awaitFuture()
150-
bootstrap.config().group().shutdownGracefully().awaitFuture()
151-
}
152-
}
153-
154147
override fun target(
155148
localAddress: InetSocketAddress?,
156149
path: String,
@@ -167,8 +160,12 @@ private class NettyWebSocketServerTransportImpl(
167160
localAddress = localAddress ?: InetSocketAddress(0)
168161
)
169162

170-
override fun target(host: String, port: Int, path: String, protocol: String?): RSocketServerTarget<NettyWebSocketServerInstance> =
171-
target(InetSocketAddress(host, port), path, protocol)
163+
override fun target(
164+
host: String,
165+
port: Int,
166+
path: String,
167+
protocol: String?,
168+
): RSocketServerTarget<NettyWebSocketServerInstance> = target(InetSocketAddress(host, port), path, protocol)
172169
}
173170

174171
@OptIn(RSocketTransportApi::class)
@@ -181,7 +178,7 @@ private class NettyWebSocketServerTargetImpl(
181178
) : RSocketServerTarget<NettyWebSocketServerInstance> {
182179

183180
@RSocketTransportApi
184-
override suspend fun startServer(handler: RSocketConnectionHandler): NettyWebSocketServerInstance {
181+
override suspend fun startServer(onConnection: (RSocketConnection) -> Unit): NettyWebSocketServerInstance {
185182
currentCoroutineContext().ensureActive()
186183
coroutineContext.ensureActive()
187184

settings.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ projects("rsocket-kotlin") {
4545

4646
module("netty-internal")
4747
module("netty-tcp")
48-
// module("netty-websocket")
48+
module("netty-websocket")
4949
module("netty-quic")
5050
}
5151

0 commit comments

Comments
 (0)