Skip to content

Commit 77f88ae

Browse files
committed
fix netty QUIC
1 parent 0cbc7d4 commit 77f88ae

File tree

3 files changed

+14
-23
lines changed

3 files changed

+14
-23
lines changed

rsocket-transports/netty-internal/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/internal/io.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ public fun Buffer.toByteBuf(allocator: ByteBufAllocator): ByteBuf {
4848
return nettyBuffer
4949
}
5050

51-
// TODO: handle FUTURE result at least in QUIC
5251
public fun Channel.writeBuffer(buffer: Buffer) {
5352
write(buffer.toByteBuf(alloc()), voidPromise())
5453
}
54+
55+
// for quic
56+
public suspend fun Channel.writeAndFlushBuffer(buffer: Buffer) {
57+
writeAndFlush(buffer.toByteBuf(alloc())).awaitFuture()
58+
}

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/DebugLoggingHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ internal val QuicStreamChannel.debugName: String
2828
} else {
2929
"REMOTE"
3030
}
31-
return "STREAM-$name"
31+
return "STREAM-$name $this"
3232
}
3333

3434
// TODO: move it somehow to tests

rsocket-transports/netty-quic/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/quic/NettyQuicStream.kt

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import io.rsocket.kotlin.internal.io.*
2525
import io.rsocket.kotlin.transport.*
2626
import io.rsocket.kotlin.transport.netty.internal.*
2727
import kotlinx.coroutines.*
28-
import kotlinx.coroutines.channels.*
2928
import kotlinx.coroutines.channels.Channel
3029
import kotlinx.io.*
3130
import kotlin.coroutines.*
@@ -47,33 +46,24 @@ internal class NettyQuicStream(
4746
init {
4847
@OptIn(DelicateCoroutinesApi::class)
4948
launch(start = CoroutineStart.ATOMIC) {
50-
val outboundJob = launch {
49+
launch {
50+
// TODO: check cancellation?
5151
nonCancellable {
5252
try {
5353
while (true) {
54-
// we write all available frames here, and only after it flush
55-
// in this case, if there are several buffered frames we can send them in one go
56-
// avoiding unnecessary flushes
57-
channel.writeBuffer(outbound.receiveCatching().getOrNull() ?: break)
58-
while (true) channel.writeBuffer(outbound.tryReceive().getOrNull() ?: break)
59-
channel.flush()
54+
channel.writeAndFlushBuffer(outbound.receiveCatching().getOrNull() ?: break)
6055
}
6156
} finally {
6257
outbound.cancel()
63-
// we dont' use `shutdownOutput` here as it will not flush buffered messages!
64-
channel.writeAndFlush(QuicStreamFrame.EMPTY_FIN).awaitFuture()
58+
channel.shutdownOutput().awaitFuture()
6559
}
6660
}
6761
}
6862
try {
6963
awaitCancellation()
7064
} finally {
71-
nonCancellable {
72-
outbound.close()
73-
inbound.cancel()
74-
outboundJob.join()
75-
channel.close().awaitFuture()
76-
}
65+
outbound.close()
66+
inbound.cancel()
7767
}
7868
}
7969
}
@@ -87,8 +77,8 @@ internal class NettyQuicStream(
8777
cancel("exceptionCaught", cause)
8878
}
8979

90-
override fun userEventTriggered(ctx: ChannelHandlerContext?, evt: Any?) {
91-
if (evt is ChannelInputShutdownEvent) inbound.close()
80+
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any?) {
81+
if (evt === ChannelInputShutdownEvent.INSTANCE) inbound.close()
9282
super.userEventTriggered(ctx, evt)
9383
}
9484

@@ -106,8 +96,6 @@ internal class NettyQuicStream(
10696
}
10797

10898
override suspend fun receiveFrame(): Buffer? {
109-
inbound.tryReceive().onSuccess { return it }
110-
channel.read()
11199
return inbound.receiveCatching().getOrNull()
112100
}
113101
}
@@ -116,7 +104,6 @@ internal class NettyQuicStream(
116104
internal object NettyQuicStreamInitializer : ChannelInitializer<QuicStreamChannel>() {
117105
override fun initChannel(channel: QuicStreamChannel) {
118106
// channel.pipeline().addLast(DebugLoggingHandler(channel.debugName))
119-
channel.config().isAutoRead = false
120107

121108
channel.pipeline().addLast(
122109
"rsocket-length-encoder",

0 commit comments

Comments
 (0)