diff --git a/libp2p/src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt b/libp2p/src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt index 3f7f460a0..ab9f1fbb5 100644 --- a/libp2p/src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt +++ b/libp2p/src/main/kotlin/io/libp2p/core/mux/StreamMuxerProtocol.kt @@ -22,7 +22,7 @@ fun interface StreamMuxerProtocol { } /** - * @param maxBufferedConnectionWrites the maximum amount of bytes in the write buffer per connection + * @param maxBufferedConnectionWrites the maximum amount of bytes in the internal write buffer per connection */ @JvmStatic @JvmOverloads diff --git a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt index e072ef8b5..68fd27a3d 100644 --- a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt +++ b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/AbstractMuxHandler.kt @@ -83,6 +83,8 @@ abstract class AbstractMuxHandler() : */ abstract fun releaseMessage(msg: TData) + abstract fun isChildWritable(child: MuxChannel): Boolean + abstract fun onChildWrite(child: MuxChannel, data: TData) protected fun onRemoteOpen(id: MuxId) { diff --git a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt index 855046c5a..84680d902 100644 --- a/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt +++ b/libp2p/src/main/kotlin/io/libp2p/etc/util/netty/mux/MuxChannel.kt @@ -32,6 +32,10 @@ class MuxChannel( initializer(this) } + override fun isWritable(): Boolean { + return super.isWritable() && parent.isChildWritable(this) + } + override fun doWrite(buf: ChannelOutboundBuffer) { while (true) { val msg = buf.current() ?: break diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt b/libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt index 4e061cbab..4d32cdedf 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/mplex/MplexHandler.kt @@ -33,6 +33,10 @@ open class MplexHandler( } } + override fun isChildWritable(child: MuxChannel): Boolean { + return true + } + override fun onChildWrite(child: MuxChannel, data: ByteBuf) { val ctx = getChannelHandlerContext() data.sliceMaxSize(maxFrameDataLength) diff --git a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt index 79fc1049f..f44da2821 100644 --- a/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt +++ b/libp2p/src/main/kotlin/io/libp2p/mux/yamux/YamuxHandler.kt @@ -165,6 +165,15 @@ open class YamuxHandler( goAwayPromise.complete(msg.length) } + override fun isChildWritable(child: MuxChannel): Boolean { + val windowSize = windowSizes[child.id]?.send + return if (windowSize == null) { + false + } else { + windowSize.get() > 0 + } + } + override fun onChildWrite(child: MuxChannel, data: ByteBuf) { val windowSize = windowSizes[child.id]?.send if (windowSize == null) { @@ -192,12 +201,13 @@ open class YamuxHandler( val frame = YamuxFrame(child.id, YamuxType.DATA, 0, length.toLong(), slicedData) getChannelHandlerContext().writeAndFlush(frame) } else { - // wait until the window is increased to send + // add to internal outbound buffer until the window is increased addToSendBuffer(child, data) } } } + // Can't rely only on the Netty outbound buffer to handle window updates, so specifying an internal outbound buffer private fun addToSendBuffer(child: MuxChannel, data: ByteBuf) { val buffer = sendBuffers.getOrPut(child.id) { SendBuffer(child.id) } buffer.add(data)