Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ List of components in the Libp2p spec and their JVM implementation status
| | Component | Status |
|--------------------------|-------------------------------------------------------------------------------------------------|:----------------:|
| **Transport** | tcp | :green_apple: |
| | [quic](https://github.com/libp2p/specs/tree/master/quic) | :tomato: |
| | [quic](https://github.com/libp2p/specs/tree/master/quic) | :lemon: |
| | websocket | :lemon: |
| | [webtransport](https://github.com/libp2p/specs/tree/master/webtransport) | |
| | [webrtc-browser-to-server](https://github.com/libp2p/specs/blob/master/webrtc/webrtc-direct.md) | |
Expand Down
5 changes: 5 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/core/dsl/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ open class Builder {
*/
open fun transports(fn: TransportsBuilder.() -> Unit): Builder = apply { fn(transports) }

/**
* Manipulates the secure transports for this host.
*/
open fun secureTransports(fn: SecureTransportsBuilder.() -> Unit): Builder = apply { fn(secureTransports) }

/**
* [AddressBook] implementation
*/
Expand Down
11 changes: 7 additions & 4 deletions libp2p/src/main/kotlin/io/libp2p/etc/types/NettyExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ fun ChannelFuture.toCompletableFuture(): CompletableFuture<Channel> {
return ret
}

fun Future<*>.toVoidCompletableFuture(): CompletableFuture<Unit> {
val ret = CompletableFuture<Unit>()
fun Future<*>.toVoidCompletableFuture(): CompletableFuture<Unit> = toCompletableFuture().thenApply { }

fun <T> Future<T>.toCompletableFuture(): CompletableFuture<T> {
val ret = CompletableFuture<T>()
this.addListener {
if (it.isSuccess) {
ret.complete(Unit)
@Suppress("UNCHECKED_CAST")
ret.complete(it.get() as T)
} else {
ret.completeExceptionally(it.cause())
}
Expand All @@ -45,5 +48,5 @@ fun ChannelPipeline.getHandlerName(handler: ChannelHandler) = (
?: throw IllegalArgumentException("Handler $handler not found in pipeline $this")
)

fun ChannelPipeline.addAfter(handler: ChannelHandler, newHandlerName: String, newHandler: ChannelHandler) =
fun ChannelPipeline.addAfter(handler: ChannelHandler, newHandlerName: String, newHandler: ChannelHandler): ChannelPipeline =
addAfter(getHandlerName(handler), newHandlerName, newHandler)
10 changes: 1 addition & 9 deletions libp2p/src/main/kotlin/io/libp2p/etc/util/netty/NettyUtil.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package io.libp2p.etc.util.netty

import io.libp2p.etc.types.addAfter
import io.libp2p.etc.types.fromHex
import io.netty.channel.Channel
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelInitializer
import io.netty.util.internal.StringUtil

class NettyInit(val channel: Channel, val thisHandler: ChannelHandler) {
class NettyInit(val channel: Channel, thisHandler: ChannelHandler) {
private var lastLocalHandler = thisHandler
fun addLastLocal(handler: ChannelHandler) {
channel.pipeline().addAfter(lastLocalHandler, generateName(channel, handler), handler)
Expand All @@ -23,13 +22,6 @@ fun nettyInitializer(initer: (NettyInit) -> Unit): ChannelInitializer<Channel> {
}
}

private val regex = Regex("\\|[0-9a-fA-F]{8}\\| ")
fun String.fromLogHandler() = lines()
.filter { it.contains(regex) }
.map { it.substring(11, 59).replace(" ", "") }
.flatMap { it.fromHex().asList() }
.toByteArray()

private fun generateName(ch: Channel, handler: ChannelHandler): String {
val className = StringUtil.simpleClassName(handler.javaClass)
val names = ch.pipeline().names().toSet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ interface ProtocolMessageHandler<TMessage> {
fun onActivated(stream: Stream) = Unit
fun onMessage(stream: Stream, msg: TMessage) = Unit
fun onClosed(stream: Stream) = Unit
fun onReadClosed(stream: Stream) = Unit
fun onException(cause: Throwable?) = Unit

fun fireMessage(stream: Stream, msg: Any) {
@Suppress("UNCHECKED_CAST")
onMessage(stream, msg as TMessage)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.libp2p.protocol

import io.libp2p.core.Stream
import io.libp2p.etc.util.netty.mux.RemoteWriteClosed
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.util.ReferenceCounted
Expand Down Expand Up @@ -33,7 +34,8 @@ class ProtocolMessageHandlerAdapter<TMessage>(
}

override fun channelRead0(ctx: ChannelHandlerContext?, msg: Any) {
pmh.fireMessage(stream, msg)
@Suppress("UNCHECKED_CAST")
pmh.onMessage(stream, msg as TMessage)
}

override fun channelUnregistered(ctx: ChannelHandlerContext?) {
Expand All @@ -44,6 +46,13 @@ class ProtocolMessageHandlerAdapter<TMessage>(
pmh.onException(cause)
}

override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt == RemoteWriteClosed) {
pmh.onReadClosed(stream)
}
super.userEventTriggered(ctx, evt)
}

// ///////////////////////
private fun refCount(obj: Any): Int {
return if (obj is ReferenceCounted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fun buildTlsHandler(
handshakeComplete: CompletableFuture<SecureChannel.Session>,
ctx: ChannelHandlerContext
): SslHandler {
val connectionKeys = if (certAlgorithm.equals("ECDSA")) generateEcdsaKeyPair() else generateEd25519KeyPair()
val connectionKeys = if (certAlgorithm == "ECDSA") generateEcdsaKeyPair() else generateEd25519KeyPair()
val javaPrivateKey = getJavaKey(connectionKeys.first)
val sslContext = (
if (isInitiator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.libp2p.etc.types.toVoidCompletableFuture
import io.netty.channel.Channel
import java.util.concurrent.CompletableFuture

class StreamOverNetty(
open class StreamOverNetty(
ch: Channel,
override val connection: Connection,
initiator: Boolean
Expand Down
22 changes: 22 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/transport/quic/QuicStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.libp2p.transport.quic

import io.libp2p.core.Connection
import io.libp2p.etc.types.toVoidCompletableFuture
import io.libp2p.transport.implementation.StreamOverNetty
import io.netty.handler.codec.quic.QuicStreamChannel
import java.util.concurrent.CompletableFuture

class QuicStream(
val quicStreamChannel: QuicStreamChannel,
connection: Connection,
initiator: Boolean
) : StreamOverNetty(quicStreamChannel, connection, initiator) {

init {
pushHandler(QuicStreamReadCloseEventConverter())
}

override fun closeWrite(): CompletableFuture<Unit> {
return quicStreamChannel.shutdownOutput().toVoidCompletableFuture()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.libp2p.transport.quic

import io.libp2p.etc.util.netty.mux.RemoteWriteClosed
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.socket.ChannelInputShutdownReadComplete

/**
* Convert QUIC library specific event on remote stream close to Libp2p specific event
*/
class QuicStreamReadCloseEventConverter : ChannelInboundHandlerAdapter() {

override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
ctx.fireUserEventTriggered(RemoteWriteClosed)
} else {
super.userEventTriggered(ctx, evt)
}
}
}
Loading
Loading