Skip to content

Commit 852f853

Browse files
committed
Adapt ktor tcp transport implementation to the latest change in transport API and simplify some parts
* drop the ability to change dispatcher - always use Default * improve a bit usage of dispatchers and close sequences
1 parent 929c319 commit 852f853

File tree

5 files changed

+154
-207
lines changed

5 files changed

+154
-207
lines changed

rsocket-transports/ktor-tcp/api/rsocket-transport-ktor-tcp.api

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ public final class io/rsocket/kotlin/transport/ktor/tcp/KtorTcpClientTransport$F
88
}
99

1010
public abstract interface class io/rsocket/kotlin/transport/ktor/tcp/KtorTcpClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
11-
public abstract fun dispatcher (Lkotlin/coroutines/CoroutineContext;)V
12-
public fun inheritDispatcher ()V
1311
public abstract fun selectorManager (Lio/ktor/network/selector/SelectorManager;Z)V
14-
public abstract fun selectorManagerDispatcher (Lkotlin/coroutines/CoroutineContext;)V
1512
public abstract fun socketOptions (Lkotlin/jvm/functions/Function1;)V
1613
}
1714

@@ -31,10 +28,7 @@ public final class io/rsocket/kotlin/transport/ktor/tcp/KtorTcpServerTransport$F
3128
}
3229

3330
public abstract interface class io/rsocket/kotlin/transport/ktor/tcp/KtorTcpServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
34-
public abstract fun dispatcher (Lkotlin/coroutines/CoroutineContext;)V
35-
public fun inheritDispatcher ()V
3631
public abstract fun selectorManager (Lio/ktor/network/selector/SelectorManager;Z)V
37-
public abstract fun selectorManagerDispatcher (Lkotlin/coroutines/CoroutineContext;)V
3832
public abstract fun socketOptions (Lkotlin/jvm/functions/Function1;)V
3933
}
4034

rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpClientTransport.kt

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,56 +34,44 @@ public sealed interface KtorTcpClientTransport : RSocketTransport {
3434

3535
@OptIn(RSocketTransportApi::class)
3636
public sealed interface KtorTcpClientTransportBuilder : RSocketTransportBuilder<KtorTcpClientTransport> {
37-
public fun dispatcher(context: CoroutineContext)
38-
public fun inheritDispatcher(): Unit = dispatcher(EmptyCoroutineContext)
39-
40-
public fun selectorManagerDispatcher(context: CoroutineContext)
4137
public fun selectorManager(manager: SelectorManager, manage: Boolean)
42-
4338
public fun socketOptions(block: SocketOptions.TCPClientSocketOptions.() -> Unit)
44-
4539
//TODO: TLS support
4640
}
4741

4842
private class KtorTcpClientTransportBuilderImpl : KtorTcpClientTransportBuilder {
49-
private var dispatcher: CoroutineContext = Dispatchers.Default
50-
private var selector: KtorTcpSelector = KtorTcpSelector.FromContext(Dispatchers.IoCompatible)
43+
private var selectorManager: SelectorManager? = null
44+
private var manageSelectorManager: Boolean = true
5145
private var socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
5246

53-
override fun dispatcher(context: CoroutineContext) {
54-
check(context[Job] == null) { "Dispatcher shouldn't contain job" }
55-
this.dispatcher = context
56-
}
57-
5847
override fun socketOptions(block: SocketOptions.TCPClientSocketOptions.() -> Unit) {
5948
this.socketOptions = block
6049
}
6150

62-
override fun selectorManagerDispatcher(context: CoroutineContext) {
63-
check(context[Job] == null) { "Dispatcher shouldn't contain job" }
64-
this.selector = KtorTcpSelector.FromContext(context)
65-
}
66-
6751
override fun selectorManager(manager: SelectorManager, manage: Boolean) {
68-
this.selector = KtorTcpSelector.FromInstance(manager, manage)
52+
this.selectorManager = manager
53+
this.manageSelectorManager = manage
6954
}
7055

7156
@RSocketTransportApi
72-
override fun buildTransport(context: CoroutineContext): KtorTcpClientTransport {
73-
val transportContext = context.supervisorContext() + dispatcher
74-
return KtorTcpClientTransportImpl(
75-
coroutineContext = transportContext,
76-
socketOptions = socketOptions,
77-
selectorManager = selector.createFor(transportContext)
78-
)
79-
}
57+
override fun buildTransport(context: CoroutineContext): KtorTcpClientTransport = KtorTcpClientTransportImpl(
58+
coroutineContext = context.supervisorContext() + Dispatchers.Default,
59+
socketOptions = socketOptions,
60+
selectorManager = selectorManager ?: SelectorManager(Dispatchers.IoCompatible),
61+
manageSelectorManager = manageSelectorManager
62+
)
8063
}
8164

8265
private class KtorTcpClientTransportImpl(
8366
override val coroutineContext: CoroutineContext,
8467
private val socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit,
8568
private val selectorManager: SelectorManager,
69+
manageSelectorManager: Boolean,
8670
) : KtorTcpClientTransport {
71+
init {
72+
if (manageSelectorManager) coroutineContext.job.invokeOnCompletion { selectorManager.close() }
73+
}
74+
8775
override fun target(remoteAddress: SocketAddress): RSocketClientTarget = KtorTcpClientTargetImpl(
8876
coroutineContext = coroutineContext.supervisorContext(),
8977
socketOptions = socketOptions,
@@ -101,10 +89,17 @@ private class KtorTcpClientTargetImpl(
10189
private val selectorManager: SelectorManager,
10290
private val remoteAddress: SocketAddress,
10391
) : RSocketClientTarget {
104-
10592
@RSocketTransportApi
106-
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
107-
val socket = aSocket(selectorManager).tcp().connect(remoteAddress, socketOptions)
108-
handler.handleKtorTcpConnection(socket)
93+
override suspend fun connectClient(): RSocketConnection {
94+
currentCoroutineContext().ensureActive()
95+
coroutineContext.ensureActive()
96+
97+
return withContext(Dispatchers.IoCompatible) {
98+
val socket = aSocket(selectorManager).tcp().connect(remoteAddress, socketOptions)
99+
KtorTcpConnection(
100+
parentContext = this@KtorTcpClientTargetImpl.coroutineContext,
101+
socket = socket
102+
)
103+
}
109104
}
110105
}
Lines changed: 72 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,86 +24,95 @@ import io.rsocket.kotlin.transport.internal.*
2424
import kotlinx.coroutines.*
2525
import kotlinx.coroutines.channels.*
2626
import kotlinx.io.*
27+
import kotlin.coroutines.*
2728

2829
@RSocketTransportApi
29-
internal suspend fun RSocketConnectionHandler.handleKtorTcpConnection(socket: Socket): Unit = coroutineScope {
30-
val outboundQueue = PrioritizationFrameQueue(Channel.BUFFERED)
31-
val inbound = bufferChannel(Channel.BUFFERED)
30+
internal class KtorTcpConnection(
31+
parentContext: CoroutineContext,
32+
private val socket: Socket,
33+
) : RSocketSequentialConnection {
34+
private val outboundQueue = PrioritizationFrameQueue()
35+
private val inbound = bufferChannel(Channel.BUFFERED)
3236

33-
val readerJob = launch {
34-
val input = socket.openReadChannel()
35-
try {
36-
while (true) inbound.send(input.readFrame() ?: break)
37-
input.cancel(null)
38-
} catch (cause: Throwable) {
39-
input.cancel(cause)
40-
throw cause
41-
}
42-
}.onCompletion { inbound.cancel() }
37+
override val coroutineContext: CoroutineContext = parentContext.childContext()
4338

44-
val writerJob = launch {
45-
val output = socket.openWriteChannel()
46-
try {
47-
while (true) {
48-
// we write all available frames here, and only after it flush
49-
// in this case, if there are several buffered frames we can send them in one go
50-
// avoiding unnecessary flushes
51-
output.writeFrame(outboundQueue.dequeueFrame() ?: break)
52-
while (true) output.writeFrame(outboundQueue.tryDequeueFrame() ?: break)
53-
output.flush()
39+
init {
40+
@OptIn(DelicateCoroutinesApi::class)
41+
launch(start = CoroutineStart.ATOMIC) {
42+
val outboundJob = launch {
43+
nonCancellable {
44+
val output = socket.openWriteChannel()
45+
try {
46+
while (true) {
47+
// we write all available frames here, and only after it flush
48+
// in this case, if there are several buffered frames we can send them in one go
49+
// avoiding unnecessary flushes
50+
output.writeFrame(outboundQueue.dequeueFrame() ?: break)
51+
while (true) output.writeFrame(outboundQueue.tryDequeueFrame() ?: break)
52+
output.flush()
53+
}
54+
output.flushAndClose()
55+
} catch (cause: Throwable) {
56+
output.cancel(cause)
57+
throw cause
58+
}
59+
}
60+
}.onCompletion {
61+
outboundQueue.cancel()
5462
}
55-
output.close(null)
56-
} catch (cause: Throwable) {
57-
output.close(cause)
58-
throw cause
59-
}
60-
}.onCompletion { outboundQueue.cancel() }
6163

62-
try {
63-
handleConnection(KtorTcpConnection(outboundQueue, inbound))
64-
} finally {
65-
readerJob.cancel()
66-
outboundQueue.close() // will cause `writerJob` completion
67-
// even if it was cancelled, we still need to close socket and await it closure
68-
withContext(NonCancellable) {
69-
// await completion of read/write and then close socket
70-
readerJob.join()
71-
writerJob.join()
72-
// close socket
73-
socket.close()
74-
socket.socketContext.join()
64+
val inboundJob = launch {
65+
val input = socket.openReadChannel()
66+
try {
67+
while (true) {
68+
inbound.send(input.readFrame() ?: break)
69+
}
70+
input.cancel(null)
71+
} catch (cause: Throwable) {
72+
input.cancel(cause)
73+
throw cause
74+
}
75+
}.onCompletion {
76+
inbound.cancel()
77+
}
78+
79+
try {
80+
awaitCancellation()
81+
} finally {
82+
nonCancellable {
83+
outboundQueue.close()
84+
outboundJob.join()
85+
inboundJob.join()
86+
// await socket completion
87+
socket.close()
88+
socket.socketContext.join()
89+
}
90+
}
7591
}
7692
}
77-
}
7893

79-
@RSocketTransportApi
80-
private class KtorTcpConnection(
81-
private val outboundQueue: PrioritizationFrameQueue,
82-
private val inbound: ReceiveChannel<Buffer>,
83-
) : RSocketSequentialConnection {
84-
override val isClosedForSend: Boolean get() = outboundQueue.isClosedForSend
8594
override suspend fun sendFrame(streamId: Int, frame: Buffer) {
8695
return outboundQueue.enqueueFrame(streamId, frame)
8796
}
8897

8998
override suspend fun receiveFrame(): Buffer? {
9099
return inbound.receiveCatching().getOrNull()
91100
}
92-
}
93101

94-
@OptIn(InternalAPI::class)
95-
private fun ByteWriteChannel.writeFrame(frame: Buffer) {
96-
writeBuffer.writeInt24(frame.size.toInt())
97-
writeBuffer.transferFrom(frame)
98-
}
102+
@OptIn(InternalAPI::class)
103+
private fun ByteWriteChannel.writeFrame(frame: Buffer) {
104+
writeBuffer.writeInt24(frame.size.toInt())
105+
writeBuffer.transferFrom(frame)
106+
}
99107

100-
@OptIn(InternalAPI::class)
101-
private suspend fun ByteReadChannel.readFrame(): Buffer? {
102-
while (availableForRead < 3 && awaitContent(3)) yield()
103-
if (availableForRead == 0) return null
108+
@OptIn(InternalAPI::class)
109+
private suspend fun ByteReadChannel.readFrame(): Buffer? {
110+
while (availableForRead < 3 && awaitContent(3)) yield()
111+
if (availableForRead == 0) return null
104112

105-
val length = readBuffer.readInt24()
106-
return readBuffer(length).also {
107-
it.require(length.toLong())
113+
val length = readBuffer.readInt24()
114+
return readBuffer(length).also {
115+
it.require(length.toLong())
116+
}
108117
}
109118
}

rsocket-transports/ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/KtorTcpSelector.kt

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)