diff --git a/examples/android-chatter/build.gradle b/examples/android-chatter/build.gradle index f354ded48..fa37a35bd 100644 --- a/examples/android-chatter/build.gradle +++ b/examples/android-chatter/build.gradle @@ -22,17 +22,17 @@ android { } } packagingOptions { - exclude 'META-INF/io.netty.versions.properties' - exclude 'META-INF/INDEX.LIST' - exclude 'META-INF/versions/9/OSGI-INF/MANIFEST.MF' - exclude 'META-INF/license/LICENSE.aix-netbsd.txt' - exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/jni-config.json' - exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/reflect-config.json' - exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/resource-config.json' - exclude 'META-INF/license/LICENSE.boringssl.txt' - exclude 'META-INF/license/LICENSE.mvn-wrapper.txt' - exclude 'META-INF/license/LICENSE.quiche.txt' - exclude 'META-INF/license/LICENSE.tomcat-native.txt' + resources { + excludes.add("META-INF/io.netty.versions.properties") + excludes.add("META-INF/INDEX.LIST") + excludes.add("META-INF/versions/9/OSGI-INF/MANIFEST.MF") + excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/jni-config.json") + excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/reflect-config.json") + excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/resource-config.json") + excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/native-image.properties") + excludes.add("META-INF/license/*") + } + } kotlinOptions { jvmTarget = "11" diff --git a/interop-test-client/src/main/java/io/libp2p/interop/InteropTestAgent.kt b/interop-test-client/src/main/java/io/libp2p/interop/InteropTestAgent.kt index 45ec9a9d5..831e3b94c 100644 --- a/interop-test-client/src/main/java/io/libp2p/interop/InteropTestAgent.kt +++ b/interop-test-client/src/main/java/io/libp2p/interop/InteropTestAgent.kt @@ -19,6 +19,7 @@ import io.libp2p.protocol.Identify import io.libp2p.protocol.Ping import io.libp2p.security.noise.NoiseXXSecureChannel import io.libp2p.security.tls.TlsSecureChannel.Companion.ECDSA +import io.libp2p.transport.quic.QuicTransport import io.libp2p.transport.tcp.TcpTransport import redis.clients.jedis.Jedis import java.util.concurrent.CompletableFuture @@ -72,7 +73,7 @@ class InteropTestAgent(val params: InteropTestParams) { ): Host = hostJ(Builder.Defaults.None, fn = { it.identity.factory = { privateKey } if (params.transport == "quic-v1") { - // TODO add quic support + it.secureTransports.add(QuicTransport::ECDSA) } else { it.transports.add(::TcpTransport) } diff --git a/libp2p/build.gradle.kts b/libp2p/build.gradle.kts index 44b9a74b2..26fa61f56 100644 --- a/libp2p/build.gradle.kts +++ b/libp2p/build.gradle.kts @@ -14,19 +14,20 @@ dependencies { api("io.netty:netty-transport") implementation("io.netty:netty-handler") implementation("io.netty:netty-codec-http") + implementation("io.netty:netty-codec-protobuf") implementation("io.netty:netty-transport-classes-epoll") - implementation("io.netty.incubator:netty-incubator-codec-native-quic") + implementation("io.netty:netty-codec-native-quic") // OS-specific bindings - implementation("io.netty.incubator:netty-incubator-codec-native-quic::linux-x86_64") - implementation("io.netty.incubator:netty-incubator-codec-native-quic::linux-aarch_64") - implementation("io.netty.incubator:netty-incubator-codec-native-quic::osx-x86_64") - implementation("io.netty.incubator:netty-incubator-codec-native-quic::osx-aarch_64") - implementation("io.netty.incubator:netty-incubator-codec-native-quic::windows-x86_64") - implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:linux-x86_64") - implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:linux-aarch_64") - implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:osx-x86_64") - implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:osx-aarch_64") - implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:windows-x86_64") + implementation("io.netty:netty-codec-native-quic::linux-x86_64") + implementation("io.netty:netty-codec-native-quic::linux-aarch_64") + implementation("io.netty:netty-codec-native-quic::osx-x86_64") + implementation("io.netty:netty-codec-native-quic::osx-aarch_64") + implementation("io.netty:netty-codec-native-quic::windows-x86_64") + implementation("io.netty:netty-tcnative-boringssl-static::linux-x86_64") + implementation("io.netty:netty-tcnative-boringssl-static::linux-aarch_64") + implementation("io.netty:netty-tcnative-boringssl-static::osx-x86_64") + implementation("io.netty:netty-tcnative-boringssl-static::osx-aarch_64") + implementation("io.netty:netty-tcnative-boringssl-static::windows-x86_64") api("com.google.protobuf:protobuf-java") diff --git a/libp2p/src/main/kotlin/io/libp2p/etc/types/NettyExt.kt b/libp2p/src/main/kotlin/io/libp2p/etc/types/NettyExt.kt index e4c9c1d49..bf46b2157 100644 --- a/libp2p/src/main/kotlin/io/libp2p/etc/types/NettyExt.kt +++ b/libp2p/src/main/kotlin/io/libp2p/etc/types/NettyExt.kt @@ -4,6 +4,7 @@ import io.netty.channel.Channel import io.netty.channel.ChannelFuture import io.netty.channel.ChannelHandler import io.netty.channel.ChannelPipeline +import io.netty.util.concurrent.Future import java.util.concurrent.CompletableFuture fun ChannelFuture.toVoidCompletableFuture(): CompletableFuture = toCompletableFuture().thenApply { } @@ -20,6 +21,18 @@ fun ChannelFuture.toCompletableFuture(): CompletableFuture { return ret } +fun Future<*>.toVoidCompletableFuture(): CompletableFuture { + val ret = CompletableFuture() + this.addListener { + if (it.isSuccess) { + ret.complete(Unit) + } else { + ret.completeExceptionally(it.cause()) + } + } + return ret +} + fun ChannelPipeline.replace(oldHandler: ChannelHandler, newHandlers: List>) { replace(oldHandler, newHandlers[0].first, newHandlers[0].second) for (i in 1 until newHandlers.size) { diff --git a/libp2p/src/main/kotlin/io/libp2p/transport/implementation/PlainNettyTransport.kt b/libp2p/src/main/kotlin/io/libp2p/transport/implementation/PlainNettyTransport.kt index 96931e4ec..8d135fa9e 100644 --- a/libp2p/src/main/kotlin/io/libp2p/transport/implementation/PlainNettyTransport.kt +++ b/libp2p/src/main/kotlin/io/libp2p/transport/implementation/PlainNettyTransport.kt @@ -19,7 +19,8 @@ import io.netty.bootstrap.ServerBootstrap import io.netty.channel.Channel import io.netty.channel.ChannelHandler import io.netty.channel.ChannelOption -import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.MultiThreadIoEventLoopGroup +import io.netty.channel.nio.NioIoHandler import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.socket.nio.NioSocketChannel import java.net.InetSocketAddress @@ -39,8 +40,12 @@ abstract class PlainNettyTransport( private val listeners = mutableMapOf() private val channels = mutableListOf() - private var workerGroup by lazyVar { NioEventLoopGroup() } - private var bossGroup by lazyVar { NioEventLoopGroup(1) } + private var workerGroup by lazyVar { + MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()) + } + private var bossGroup by lazyVar { + MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory()) + } private var client by lazyVar { Bootstrap().apply { @@ -85,10 +90,11 @@ abstract class PlainNettyTransport( val everythingThatNeedsToClose = unbindsCompleted.union(channelsClosed) val allClosed = CompletableFuture.allOf(*everythingThatNeedsToClose.toTypedArray()) - return allClosed.thenApply { - workerGroup.shutdownGracefully() - bossGroup.shutdownGracefully() - Unit + return allClosed.thenCompose { + CompletableFuture.allOf( + workerGroup.shutdownGracefully().toVoidCompletableFuture(), + bossGroup.shutdownGracefully().toVoidCompletableFuture() + ).thenApply { } } } // close diff --git a/libp2p/src/main/kotlin/io/libp2p/transport/quic/QuicTransport.kt b/libp2p/src/main/kotlin/io/libp2p/transport/quic/QuicTransport.kt index 63d448de3..c4f1a39d0 100644 --- a/libp2p/src/main/kotlin/io/libp2p/transport/quic/QuicTransport.kt +++ b/libp2p/src/main/kotlin/io/libp2p/transport/quic/QuicTransport.kt @@ -19,7 +19,11 @@ import io.libp2p.etc.STREAM import io.libp2p.etc.types.* import io.libp2p.etc.util.MultiaddrUtils import io.libp2p.etc.util.netty.nettyInitializer -import io.libp2p.security.tls.* +import io.libp2p.security.tls.Libp2pTrustManager +import io.libp2p.security.tls.buildCert +import io.libp2p.security.tls.getJavaKey +import io.libp2p.security.tls.getPublicKeyFromCert +import io.libp2p.security.tls.verifyAndExtractPeerId import io.libp2p.transport.implementation.ConnectionOverNetty import io.libp2p.transport.implementation.NettyTransport import io.libp2p.transport.implementation.StreamOverNetty @@ -29,12 +33,13 @@ import io.netty.buffer.PooledByteBufAllocator import io.netty.channel.* import io.netty.channel.epoll.Epoll import io.netty.channel.epoll.EpollDatagramChannel -import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.nio.NioIoHandler import io.netty.channel.socket.nio.NioDatagramChannel +import io.netty.handler.codec.quic.* import io.netty.handler.ssl.ClientAuth -import io.netty.incubator.codec.quic.* import org.slf4j.LoggerFactory -import java.net.* +import java.net.InetSocketAddress +import java.net.SocketAddress import java.time.Duration import java.util.* import java.util.concurrent.CompletableFuture @@ -45,7 +50,8 @@ class QuicTransport( private val certAlgorithm: String, private val protocols: List> ) : NettyTransport { - private val log = LoggerFactory.getLogger(QuicTransport::class.java) + + private val logger = LoggerFactory.getLogger(QuicTransport::class.java) private var closed = false var connectTimeout = Duration.ofSeconds(15) @@ -53,8 +59,9 @@ class QuicTransport( private val listeners = mutableMapOf() private val channels = mutableListOf() - private var workerGroup by lazyVar { NioEventLoopGroup() } - private var bossGroup by lazyVar { NioEventLoopGroup(1) } + private var workerGroup by lazyVar { + MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()) + } private var allocator by lazyVar { PooledByteBufAllocator(true) } private var multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1 private var incomingMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol } @@ -78,7 +85,7 @@ class QuicTransport( } @JvmStatic - fun Ecdsa(k: PrivKey, p: List>): QuicTransport { + fun ECDSA(k: PrivKey, p: List>): QuicTransport { return QuicTransport(k, "ECDSA", p) } @@ -128,10 +135,8 @@ class QuicTransport( val everythingThatNeedsToClose = unbindsCompleted.union(channelsClosed) val allClosed = CompletableFuture.allOf(*everythingThatNeedsToClose.toTypedArray()) - return allClosed.thenApply { - workerGroup.shutdownGracefully() - bossGroup.shutdownGracefully() - Unit + return allClosed.thenCompose { + workerGroup.shutdownGracefully().toVoidCompletableFuture() } } @@ -164,7 +169,7 @@ class QuicTransport( listeners -= addr } } - log.info("Quic server listening on {}", addr) + logger.info("Quic server listening on {}", addr) res.complete(null) } } @@ -208,7 +213,6 @@ class QuicTransport( .option(ChannelOption.AUTO_READ, true) .option(ChannelOption.ALLOCATOR, allocator) .remoteAddress(fromMultiaddr(addr)) -// .handler(connHandler) .streamHandler(object : ChannelInboundHandlerAdapter() { override fun handlerAdded(ctx: ChannelHandlerContext?) { val connection = ctx!!.channel().parent().attr(CONNECTION).get() as Connection @@ -285,7 +289,7 @@ class QuicTransport( val javaPrivateKey = getJavaKey(connectionKeys.first) val isClient = expectedRemotePeerId != null val cert = buildCert(localKey, connectionKeys.first) - log.info("Building {} keys and cert for peerid {}", certAlgorithm, PeerId.fromPubKey(localKey.publicKey())) + logger.info("Building {} keys and cert for peer id {}", certAlgorithm, PeerId.fromPubKey(localKey.publicKey())) return ( if (isClient) { QuicSslContextBuilder.forClient().keyManager(javaPrivateKey, null, cert) @@ -326,7 +330,7 @@ class QuicTransport( val remotePeerId = verifyAndExtractPeerId(arrayOf(remoteCert)) val remotePublicKey = getPublicKeyFromCert(arrayOf(remoteCert)) - log.info("Handshake completed with remote peer id: {}", remotePeerId) + logger.info("Handshake completed with remote peer id: {}", remotePeerId) connection.setSecureSession( SecureChannel.Session( @@ -354,7 +358,7 @@ class QuicTransport( @Deprecated("Deprecated in Java") override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - log.error("An error during handshake", cause) + logger.error("An error during handshake", cause) ctx.close() } } @@ -374,8 +378,8 @@ class QuicTransport( val connection: ConnectionOverNetty ) : StreamMuxer.Session { override fun createStream(protocols: List>): StreamPromise { - var multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1 - var streamMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol } + val multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1 + val streamMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol } val multi = streamMultistreamProtocol.createMultistream(protocols) val controller = CompletableFuture() diff --git a/libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketClientInitializer.kt b/libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketClientInitializer.kt index 1fde9d5c4..d38db14ae 100644 --- a/libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketClientInitializer.kt +++ b/libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketClientInitializer.kt @@ -12,12 +12,12 @@ internal class WebSocketClientInitializer( private val url: String ) : ChannelInitializer() { - public override fun initChannel(ch: SocketChannel) { + override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline() pipeline.addLast(HttpClientCodec()) pipeline.addLast(HttpObjectAggregator(65536)) - pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE) + pipeline.addLast(WebSocketClientCompressionHandler(0)) pipeline.addLast( WebSocketClientHandshake( connectionBuilder, diff --git a/libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketServerInitializer.kt b/libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketServerInitializer.kt index f1a195ef4..0665aad98 100644 --- a/libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketServerInitializer.kt +++ b/libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketServerInitializer.kt @@ -12,12 +12,12 @@ internal class WebSocketServerInitializer( private val connectionBuilder: ChannelHandler ) : ChannelInitializer() { - public override fun initChannel(ch: SocketChannel) { + override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline() pipeline.addLast(HttpServerCodec()) pipeline.addLast(HttpObjectAggregator(65536)) - pipeline.addLast(WebSocketServerCompressionHandler()) + pipeline.addLast(WebSocketServerCompressionHandler(0)) pipeline.addLast(WebSocketServerProtocolHandler("/", null, true)) pipeline.addLast(WebSocketServerHandshakeListener(connectionBuilder)) } // initChannel diff --git a/libp2p/src/test/java/io/libp2p/transport/quic/QuicKuboTestJava.java b/libp2p/src/test/java/io/libp2p/transport/quic/QuicKuboTestJava.java index bb408db73..c597eb8ba 100644 --- a/libp2p/src/test/java/io/libp2p/transport/quic/QuicKuboTestJava.java +++ b/libp2p/src/test/java/io/libp2p/transport/quic/QuicKuboTestJava.java @@ -21,7 +21,7 @@ void pingKubo() throws Exception { PeerId peerId = PeerId.fromBase58(getKuboPeerId()); Host clientHost = - new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::Ecdsa).build(); + new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::ECDSA).build(); CompletableFuture clientStarted = clientHost.start(); clientStarted.get(5, TimeUnit.SECONDS); diff --git a/libp2p/src/test/java/io/libp2p/transport/quic/QuicServerTestJava.java b/libp2p/src/test/java/io/libp2p/transport/quic/QuicServerTestJava.java index 0fb95b198..37f72be99 100644 --- a/libp2p/src/test/java/io/libp2p/transport/quic/QuicServerTestJava.java +++ b/libp2p/src/test/java/io/libp2p/transport/quic/QuicServerTestJava.java @@ -32,7 +32,7 @@ void pingJava() throws Exception { Host clientHost = new HostBuilder() .keyType(KeyType.ED25519) - .secureTransport(QuicTransport::Ecdsa) + .secureTransport(QuicTransport::ECDSA) .transport(TcpTransport::new) .secureChannel(TlsSecureChannel::ECDSA) .muxer(StreamMuxerProtocol::getYamux) @@ -41,7 +41,7 @@ void pingJava() throws Exception { Host serverHost = new HostBuilder() .keyType(KeyType.ED25519) - .secureTransport(QuicTransport::Ecdsa) + .secureTransport(QuicTransport::ECDSA) .transport(TcpTransport::new) .secureChannel(TlsSecureChannel::ECDSA) .muxer(StreamMuxerProtocol::getYamux) @@ -105,7 +105,7 @@ void tlsAndQuicInSameHostPing() throws Exception { Host clientHost = new HostBuilder() .keyType(KeyType.ED25519) - .secureTransport(QuicTransport::Ecdsa) + .secureTransport(QuicTransport::ECDSA) .transport(TcpTransport::new) .secureChannel(TlsSecureChannel::ECDSA) .secureChannel(NoiseXXSecureChannel::new) @@ -115,7 +115,7 @@ void tlsAndQuicInSameHostPing() throws Exception { Host serverHost = new HostBuilder() .keyType(KeyType.ED25519) - .secureTransport(QuicTransport::Ecdsa) + .secureTransport(QuicTransport::ECDSA) .transport(TcpTransport::new) .secureChannel(TlsSecureChannel::ECDSA) .secureChannel(NoiseXXSecureChannel::new) @@ -201,7 +201,7 @@ void largeBlob() throws Exception { Host clientHost = new HostBuilder() .keyType(KeyType.ED25519) - .secureTransport(QuicTransport::Ecdsa) + .secureTransport(QuicTransport::ECDSA) .builderModifier( b -> b.getDebug().getMuxFramesHandler().addCompactLogger(LogLevel.ERROR, "client")) .build(); @@ -209,7 +209,7 @@ void largeBlob() throws Exception { Host serverHost = new HostBuilder() .keyType(KeyType.ED25519) - .secureTransport(QuicTransport::Ecdsa) + .secureTransport(QuicTransport::ECDSA) .protocol(new Blob(blobSize)) .listen(localListenAddress) .builderModifier( @@ -262,12 +262,12 @@ void startHostAddPing() throws Exception { String localListenAddress = "/ip4/127.0.0.1/udp/" + getPort() + "/quic-v1"; Host clientHost = - new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::Ecdsa).build(); + new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::ECDSA).build(); Host serverHost = new HostBuilder() .keyType(KeyType.ED25519) - .secureTransport(QuicTransport::Ecdsa) + .secureTransport(QuicTransport::ECDSA) .listen(localListenAddress) .build(); diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/tools/TCPProxy.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/tools/TCPProxy.kt index eb77980d7..6cb5c2d7b 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/tools/TCPProxy.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/tools/TCPProxy.kt @@ -7,7 +7,8 @@ import io.netty.channel.ChannelFuture import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelOption -import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.MultiThreadIoEventLoopGroup +import io.netty.channel.nio.NioIoHandler import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.socket.nio.NioSocketChannel import io.netty.handler.logging.LogLevel @@ -19,7 +20,7 @@ class TCPProxy { fun start(listenPort: Int, dialHost: String, dialPort: Int): ChannelFuture { val future = ServerBootstrap().apply { - group(NioEventLoopGroup()) + group(MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())) channel(NioServerSocketChannel::class.java) childHandler( nettyInitializer { @@ -29,7 +30,7 @@ class TCPProxy { serverCtx.channel().pipeline().addFirst(LoggingHandler("server", LogLevel.INFO)) Bootstrap().apply { - group(NioEventLoopGroup()) + group(MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())) channel(NioSocketChannel::class.java) option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5 * 1000) handler(object : ChannelInboundHandlerAdapter() { @@ -38,7 +39,6 @@ class TCPProxy { } override fun channelActive(ctx: ChannelHandlerContext) { -// serverCtx.channel().pipeline().addFirst(LoggingHandler("client", LogLevel.INFO)) client.complete(ctx) } diff --git a/versions.gradle b/versions.gradle index 8e89ed8b6..d6f80d536 100644 --- a/versions.gradle +++ b/versions.gradle @@ -31,14 +31,17 @@ dependencyManagement { entry 'protobuf-java' entry 'protoc' } - dependencySet(group: "io.netty", version: "4.1.118.Final") { + dependencySet(group: "io.netty", version: "4.2.4.Final") { entry 'netty-common' entry 'netty-handler' entry 'netty-transport' entry 'netty-buffer' entry 'netty-codec-http' + entry 'netty-codec-protobuf' + entry 'netty-codec-native-quic' entry 'netty-transport-classes-epoll' } + dependency "io.netty:netty-tcnative-boringssl-static:2.0.72.Final" dependency "com.github.multiformats:java-multibase:v1.1.1" dependency "tech.pegasys:noise-java:22.1.0" dependencySet(group: "org.bouncycastle", version: "1.78.1") { @@ -46,6 +49,5 @@ dependencyManagement { entry 'bcpkix-jdk18on' entry 'bctls-jdk18on' } - dependency "io.netty.incubator:netty-incubator-codec-native-quic:0.0.71.Final" } } \ No newline at end of file