Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions examples/android-chatter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ android {
}
}
packagingOptions {
exclude 'META-INF/io.netty.versions.properties'
exclude 'META-INF/INDEX.LIST'
exclude 'META-INF/versions/9/OSGI-INF/MANIFEST.MF'
exclude 'META-INF/license/LICENSE.aix-netbsd.txt'
exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/jni-config.json'
exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/reflect-config.json'
exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/resource-config.json'
exclude 'META-INF/license/LICENSE.boringssl.txt'
exclude 'META-INF/license/LICENSE.mvn-wrapper.txt'
exclude 'META-INF/license/LICENSE.quiche.txt'
exclude 'META-INF/license/LICENSE.tomcat-native.txt'
resources {
excludes.add("META-INF/io.netty.versions.properties")
excludes.add("META-INF/INDEX.LIST")
excludes.add("META-INF/versions/9/OSGI-INF/MANIFEST.MF")
excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/jni-config.json")
excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/reflect-config.json")
excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/resource-config.json")
excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/native-image.properties")
excludes.add("META-INF/license/*")
}

}
kotlinOptions {
jvmTarget = "11"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.libp2p.protocol.Identify
import io.libp2p.protocol.Ping
import io.libp2p.security.noise.NoiseXXSecureChannel
import io.libp2p.security.tls.TlsSecureChannel.Companion.ECDSA
import io.libp2p.transport.quic.QuicTransport
import io.libp2p.transport.tcp.TcpTransport
import redis.clients.jedis.Jedis
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -72,7 +73,7 @@ class InteropTestAgent(val params: InteropTestParams) {
): Host = hostJ(Builder.Defaults.None, fn = {
it.identity.factory = { privateKey }
if (params.transport == "quic-v1") {
// TODO add quic support
it.secureTransports.add(QuicTransport::ECDSA)
} else {
it.transports.add(::TcpTransport)
}
Expand Down
23 changes: 12 additions & 11 deletions libp2p/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@ dependencies {
api("io.netty:netty-transport")
implementation("io.netty:netty-handler")
implementation("io.netty:netty-codec-http")
implementation("io.netty:netty-codec-protobuf")
implementation("io.netty:netty-transport-classes-epoll")
implementation("io.netty.incubator:netty-incubator-codec-native-quic")
implementation("io.netty:netty-codec-native-quic")
// OS-specific bindings
implementation("io.netty.incubator:netty-incubator-codec-native-quic::linux-x86_64")
implementation("io.netty.incubator:netty-incubator-codec-native-quic::linux-aarch_64")
implementation("io.netty.incubator:netty-incubator-codec-native-quic::osx-x86_64")
implementation("io.netty.incubator:netty-incubator-codec-native-quic::osx-aarch_64")
implementation("io.netty.incubator:netty-incubator-codec-native-quic::windows-x86_64")
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:linux-x86_64")
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:linux-aarch_64")
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:osx-x86_64")
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:osx-aarch_64")
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:windows-x86_64")
implementation("io.netty:netty-codec-native-quic::linux-x86_64")
implementation("io.netty:netty-codec-native-quic::linux-aarch_64")
implementation("io.netty:netty-codec-native-quic::osx-x86_64")
implementation("io.netty:netty-codec-native-quic::osx-aarch_64")
implementation("io.netty:netty-codec-native-quic::windows-x86_64")
implementation("io.netty:netty-tcnative-boringssl-static::linux-x86_64")
implementation("io.netty:netty-tcnative-boringssl-static::linux-aarch_64")
implementation("io.netty:netty-tcnative-boringssl-static::osx-x86_64")
implementation("io.netty:netty-tcnative-boringssl-static::osx-aarch_64")
implementation("io.netty:netty-tcnative-boringssl-static::windows-x86_64")

api("com.google.protobuf:protobuf-java")

Expand Down
13 changes: 13 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/etc/types/NettyExt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.netty.channel.Channel
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelPipeline
import io.netty.util.concurrent.Future
import java.util.concurrent.CompletableFuture

fun ChannelFuture.toVoidCompletableFuture(): CompletableFuture<Unit> = toCompletableFuture().thenApply { }
Expand All @@ -20,6 +21,18 @@ fun ChannelFuture.toCompletableFuture(): CompletableFuture<Channel> {
return ret
}

fun Future<*>.toVoidCompletableFuture(): CompletableFuture<Unit> {
val ret = CompletableFuture<Unit>()
this.addListener {
if (it.isSuccess) {
ret.complete(Unit)
} else {
ret.completeExceptionally(it.cause())
}
}
return ret
}

fun ChannelPipeline.replace(oldHandler: ChannelHandler, newHandlers: List<Pair<String, ChannelHandler>>) {
replace(oldHandler, newHandlers[0].first, newHandlers[0].second)
for (i in 1 until newHandlers.size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.Channel
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelOption
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.MultiThreadIoEventLoopGroup
import io.netty.channel.nio.NioIoHandler
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import java.net.InetSocketAddress
Expand All @@ -39,8 +40,12 @@ abstract class PlainNettyTransport(
private val listeners = mutableMapOf<Multiaddr, Channel>()
private val channels = mutableListOf<Channel>()

private var workerGroup by lazyVar { NioEventLoopGroup() }
private var bossGroup by lazyVar { NioEventLoopGroup(1) }
private var workerGroup by lazyVar {
MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())
}
private var bossGroup by lazyVar {
MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory())
}

private var client by lazyVar {
Bootstrap().apply {
Expand Down Expand Up @@ -85,10 +90,11 @@ abstract class PlainNettyTransport(
val everythingThatNeedsToClose = unbindsCompleted.union(channelsClosed)
val allClosed = CompletableFuture.allOf(*everythingThatNeedsToClose.toTypedArray())

return allClosed.thenApply {
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
Unit
return allClosed.thenCompose {
CompletableFuture.allOf(
workerGroup.shutdownGracefully().toVoidCompletableFuture(),
bossGroup.shutdownGracefully().toVoidCompletableFuture()
).thenApply { }
}
} // close

Expand Down
42 changes: 23 additions & 19 deletions libp2p/src/main/kotlin/io/libp2p/transport/quic/QuicTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import io.libp2p.etc.STREAM
import io.libp2p.etc.types.*
import io.libp2p.etc.util.MultiaddrUtils
import io.libp2p.etc.util.netty.nettyInitializer
import io.libp2p.security.tls.*
import io.libp2p.security.tls.Libp2pTrustManager
import io.libp2p.security.tls.buildCert
import io.libp2p.security.tls.getJavaKey
import io.libp2p.security.tls.getPublicKeyFromCert
import io.libp2p.security.tls.verifyAndExtractPeerId
import io.libp2p.transport.implementation.ConnectionOverNetty
import io.libp2p.transport.implementation.NettyTransport
import io.libp2p.transport.implementation.StreamOverNetty
Expand All @@ -29,12 +33,13 @@ import io.netty.buffer.PooledByteBufAllocator
import io.netty.channel.*
import io.netty.channel.epoll.Epoll
import io.netty.channel.epoll.EpollDatagramChannel
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.nio.NioIoHandler
import io.netty.channel.socket.nio.NioDatagramChannel
import io.netty.handler.codec.quic.*
import io.netty.handler.ssl.ClientAuth
import io.netty.incubator.codec.quic.*
import org.slf4j.LoggerFactory
import java.net.*
import java.net.InetSocketAddress
import java.net.SocketAddress
import java.time.Duration
import java.util.*
import java.util.concurrent.CompletableFuture
Expand All @@ -45,16 +50,18 @@ class QuicTransport(
private val certAlgorithm: String,
private val protocols: List<ProtocolBinding<*>>
) : NettyTransport {
private val log = LoggerFactory.getLogger(QuicTransport::class.java)

private val logger = LoggerFactory.getLogger(QuicTransport::class.java)

private var closed = false
var connectTimeout = Duration.ofSeconds(15)

private val listeners = mutableMapOf<Multiaddr, Channel>()
private val channels = mutableListOf<Channel>()

private var workerGroup by lazyVar { NioEventLoopGroup() }
private var bossGroup by lazyVar { NioEventLoopGroup(1) }
private var workerGroup by lazyVar {
MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())
}
private var allocator by lazyVar { PooledByteBufAllocator(true) }
private var multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1
private var incomingMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
Expand All @@ -78,7 +85,7 @@ class QuicTransport(
}

@JvmStatic
fun Ecdsa(k: PrivKey, p: List<ProtocolBinding<*>>): QuicTransport {
fun ECDSA(k: PrivKey, p: List<ProtocolBinding<*>>): QuicTransport {
return QuicTransport(k, "ECDSA", p)
}

Expand Down Expand Up @@ -128,10 +135,8 @@ class QuicTransport(
val everythingThatNeedsToClose = unbindsCompleted.union(channelsClosed)
val allClosed = CompletableFuture.allOf(*everythingThatNeedsToClose.toTypedArray())

return allClosed.thenApply {
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
Unit
return allClosed.thenCompose {
workerGroup.shutdownGracefully().toVoidCompletableFuture()
}
}

Expand Down Expand Up @@ -164,7 +169,7 @@ class QuicTransport(
listeners -= addr
}
}
log.info("Quic server listening on {}", addr)
logger.info("Quic server listening on {}", addr)
res.complete(null)
}
}
Expand Down Expand Up @@ -208,7 +213,6 @@ class QuicTransport(
.option(ChannelOption.AUTO_READ, true)
.option(ChannelOption.ALLOCATOR, allocator)
.remoteAddress(fromMultiaddr(addr))
// .handler(connHandler)
.streamHandler(object : ChannelInboundHandlerAdapter() {
override fun handlerAdded(ctx: ChannelHandlerContext?) {
val connection = ctx!!.channel().parent().attr(CONNECTION).get() as Connection
Expand Down Expand Up @@ -285,7 +289,7 @@ class QuicTransport(
val javaPrivateKey = getJavaKey(connectionKeys.first)
val isClient = expectedRemotePeerId != null
val cert = buildCert(localKey, connectionKeys.first)
log.info("Building {} keys and cert for peerid {}", certAlgorithm, PeerId.fromPubKey(localKey.publicKey()))
logger.info("Building {} keys and cert for peer id {}", certAlgorithm, PeerId.fromPubKey(localKey.publicKey()))
return (
if (isClient) {
QuicSslContextBuilder.forClient().keyManager(javaPrivateKey, null, cert)
Expand Down Expand Up @@ -326,7 +330,7 @@ class QuicTransport(
val remotePeerId = verifyAndExtractPeerId(arrayOf(remoteCert))
val remotePublicKey = getPublicKeyFromCert(arrayOf(remoteCert))

log.info("Handshake completed with remote peer id: {}", remotePeerId)
logger.info("Handshake completed with remote peer id: {}", remotePeerId)

connection.setSecureSession(
SecureChannel.Session(
Expand Down Expand Up @@ -354,7 +358,7 @@ class QuicTransport(

@Deprecated("Deprecated in Java")
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
log.error("An error during handshake", cause)
logger.error("An error during handshake", cause)
ctx.close()
}
}
Expand All @@ -374,8 +378,8 @@ class QuicTransport(
val connection: ConnectionOverNetty
) : StreamMuxer.Session {
override fun <T> createStream(protocols: List<ProtocolBinding<T>>): StreamPromise<T> {
var multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1
var streamMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
val multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1
val streamMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
val multi = streamMultistreamProtocol.createMultistream(protocols)

val controller = CompletableFuture<T>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ internal class WebSocketClientInitializer(
private val url: String
) : ChannelInitializer<SocketChannel>() {

public override fun initChannel(ch: SocketChannel) {
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()

pipeline.addLast(HttpClientCodec())
pipeline.addLast(HttpObjectAggregator(65536))
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE)
pipeline.addLast(WebSocketClientCompressionHandler(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to give ulimited buffer here and in WebSocketServerInitializer.kt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This retains the same behaviour as before, so decided to not touch it, not sure entirely how safe it is.

pipeline.addLast(
WebSocketClientHandshake(
connectionBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ internal class WebSocketServerInitializer(
private val connectionBuilder: ChannelHandler
) : ChannelInitializer<SocketChannel>() {

public override fun initChannel(ch: SocketChannel) {
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()

pipeline.addLast(HttpServerCodec())
pipeline.addLast(HttpObjectAggregator(65536))
pipeline.addLast(WebSocketServerCompressionHandler())
pipeline.addLast(WebSocketServerCompressionHandler(0))
pipeline.addLast(WebSocketServerProtocolHandler("/", null, true))
pipeline.addLast(WebSocketServerHandshakeListener(connectionBuilder))
} // initChannel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void pingKubo() throws Exception {
PeerId peerId = PeerId.fromBase58(getKuboPeerId());

Host clientHost =
new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::Ecdsa).build();
new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::ECDSA).build();

CompletableFuture<Void> clientStarted = clientHost.start();
clientStarted.get(5, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void pingJava() throws Exception {
Host clientHost =
new HostBuilder()
.keyType(KeyType.ED25519)
.secureTransport(QuicTransport::Ecdsa)
.secureTransport(QuicTransport::ECDSA)
.transport(TcpTransport::new)
.secureChannel(TlsSecureChannel::ECDSA)
.muxer(StreamMuxerProtocol::getYamux)
Expand All @@ -41,7 +41,7 @@ void pingJava() throws Exception {
Host serverHost =
new HostBuilder()
.keyType(KeyType.ED25519)
.secureTransport(QuicTransport::Ecdsa)
.secureTransport(QuicTransport::ECDSA)
.transport(TcpTransport::new)
.secureChannel(TlsSecureChannel::ECDSA)
.muxer(StreamMuxerProtocol::getYamux)
Expand Down Expand Up @@ -105,7 +105,7 @@ void tlsAndQuicInSameHostPing() throws Exception {
Host clientHost =
new HostBuilder()
.keyType(KeyType.ED25519)
.secureTransport(QuicTransport::Ecdsa)
.secureTransport(QuicTransport::ECDSA)
.transport(TcpTransport::new)
.secureChannel(TlsSecureChannel::ECDSA)
.secureChannel(NoiseXXSecureChannel::new)
Expand All @@ -115,7 +115,7 @@ void tlsAndQuicInSameHostPing() throws Exception {
Host serverHost =
new HostBuilder()
.keyType(KeyType.ED25519)
.secureTransport(QuicTransport::Ecdsa)
.secureTransport(QuicTransport::ECDSA)
.transport(TcpTransport::new)
.secureChannel(TlsSecureChannel::ECDSA)
.secureChannel(NoiseXXSecureChannel::new)
Expand Down Expand Up @@ -201,15 +201,15 @@ void largeBlob() throws Exception {
Host clientHost =
new HostBuilder()
.keyType(KeyType.ED25519)
.secureTransport(QuicTransport::Ecdsa)
.secureTransport(QuicTransport::ECDSA)
.builderModifier(
b -> b.getDebug().getMuxFramesHandler().addCompactLogger(LogLevel.ERROR, "client"))
.build();

Host serverHost =
new HostBuilder()
.keyType(KeyType.ED25519)
.secureTransport(QuicTransport::Ecdsa)
.secureTransport(QuicTransport::ECDSA)
.protocol(new Blob(blobSize))
.listen(localListenAddress)
.builderModifier(
Expand Down Expand Up @@ -262,12 +262,12 @@ void startHostAddPing() throws Exception {
String localListenAddress = "/ip4/127.0.0.1/udp/" + getPort() + "/quic-v1";

Host clientHost =
new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::Ecdsa).build();
new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::ECDSA).build();

Host serverHost =
new HostBuilder()
.keyType(KeyType.ED25519)
.secureTransport(QuicTransport::Ecdsa)
.secureTransport(QuicTransport::ECDSA)
.listen(localListenAddress)
.build();

Expand Down
Loading
Loading