Skip to content

Commit 33ffc1a

Browse files
Use netty core instead of incubator artifact for QUIC (#412)
1 parent 3d4b05f commit 33ffc1a

File tree

12 files changed

+95
-68
lines changed

12 files changed

+95
-68
lines changed

examples/android-chatter/build.gradle

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ android {
2222
}
2323
}
2424
packagingOptions {
25-
exclude 'META-INF/io.netty.versions.properties'
26-
exclude 'META-INF/INDEX.LIST'
27-
exclude 'META-INF/versions/9/OSGI-INF/MANIFEST.MF'
28-
exclude 'META-INF/license/LICENSE.aix-netbsd.txt'
29-
exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/jni-config.json'
30-
exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/reflect-config.json'
31-
exclude 'META-INF/native-image/io.netty.incubator/netty-incubator-codec-native-quic/resource-config.json'
32-
exclude 'META-INF/license/LICENSE.boringssl.txt'
33-
exclude 'META-INF/license/LICENSE.mvn-wrapper.txt'
34-
exclude 'META-INF/license/LICENSE.quiche.txt'
35-
exclude 'META-INF/license/LICENSE.tomcat-native.txt'
25+
resources {
26+
excludes.add("META-INF/io.netty.versions.properties")
27+
excludes.add("META-INF/INDEX.LIST")
28+
excludes.add("META-INF/versions/9/OSGI-INF/MANIFEST.MF")
29+
excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/jni-config.json")
30+
excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/reflect-config.json")
31+
excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/resource-config.json")
32+
excludes.add("META-INF/native-image/io.netty/netty-codec-native-quic/native-image.properties")
33+
excludes.add("META-INF/license/*")
34+
}
35+
3636
}
3737
kotlinOptions {
3838
jvmTarget = "11"

interop-test-client/src/main/java/io/libp2p/interop/InteropTestAgent.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.libp2p.protocol.Identify
1919
import io.libp2p.protocol.Ping
2020
import io.libp2p.security.noise.NoiseXXSecureChannel
2121
import io.libp2p.security.tls.TlsSecureChannel.Companion.ECDSA
22+
import io.libp2p.transport.quic.QuicTransport
2223
import io.libp2p.transport.tcp.TcpTransport
2324
import redis.clients.jedis.Jedis
2425
import java.util.concurrent.CompletableFuture
@@ -72,7 +73,7 @@ class InteropTestAgent(val params: InteropTestParams) {
7273
): Host = hostJ(Builder.Defaults.None, fn = {
7374
it.identity.factory = { privateKey }
7475
if (params.transport == "quic-v1") {
75-
// TODO add quic support
76+
it.secureTransports.add(QuicTransport::ECDSA)
7677
} else {
7778
it.transports.add(::TcpTransport)
7879
}

libp2p/build.gradle.kts

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,20 @@ dependencies {
1414
api("io.netty:netty-transport")
1515
implementation("io.netty:netty-handler")
1616
implementation("io.netty:netty-codec-http")
17+
implementation("io.netty:netty-codec-protobuf")
1718
implementation("io.netty:netty-transport-classes-epoll")
18-
implementation("io.netty.incubator:netty-incubator-codec-native-quic")
19+
implementation("io.netty:netty-codec-native-quic")
1920
// OS-specific bindings
20-
implementation("io.netty.incubator:netty-incubator-codec-native-quic::linux-x86_64")
21-
implementation("io.netty.incubator:netty-incubator-codec-native-quic::linux-aarch_64")
22-
implementation("io.netty.incubator:netty-incubator-codec-native-quic::osx-x86_64")
23-
implementation("io.netty.incubator:netty-incubator-codec-native-quic::osx-aarch_64")
24-
implementation("io.netty.incubator:netty-incubator-codec-native-quic::windows-x86_64")
25-
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:linux-x86_64")
26-
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:linux-aarch_64")
27-
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:osx-x86_64")
28-
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:osx-aarch_64")
29-
implementation("io.netty:netty-tcnative-boringssl-static:2.0.70.Final:windows-x86_64")
21+
implementation("io.netty:netty-codec-native-quic::linux-x86_64")
22+
implementation("io.netty:netty-codec-native-quic::linux-aarch_64")
23+
implementation("io.netty:netty-codec-native-quic::osx-x86_64")
24+
implementation("io.netty:netty-codec-native-quic::osx-aarch_64")
25+
implementation("io.netty:netty-codec-native-quic::windows-x86_64")
26+
implementation("io.netty:netty-tcnative-boringssl-static::linux-x86_64")
27+
implementation("io.netty:netty-tcnative-boringssl-static::linux-aarch_64")
28+
implementation("io.netty:netty-tcnative-boringssl-static::osx-x86_64")
29+
implementation("io.netty:netty-tcnative-boringssl-static::osx-aarch_64")
30+
implementation("io.netty:netty-tcnative-boringssl-static::windows-x86_64")
3031

3132
api("com.google.protobuf:protobuf-java")
3233

libp2p/src/main/kotlin/io/libp2p/etc/types/NettyExt.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.netty.channel.Channel
44
import io.netty.channel.ChannelFuture
55
import io.netty.channel.ChannelHandler
66
import io.netty.channel.ChannelPipeline
7+
import io.netty.util.concurrent.Future
78
import java.util.concurrent.CompletableFuture
89

910
fun ChannelFuture.toVoidCompletableFuture(): CompletableFuture<Unit> = toCompletableFuture().thenApply { }
@@ -20,6 +21,18 @@ fun ChannelFuture.toCompletableFuture(): CompletableFuture<Channel> {
2021
return ret
2122
}
2223

24+
fun Future<*>.toVoidCompletableFuture(): CompletableFuture<Unit> {
25+
val ret = CompletableFuture<Unit>()
26+
this.addListener {
27+
if (it.isSuccess) {
28+
ret.complete(Unit)
29+
} else {
30+
ret.completeExceptionally(it.cause())
31+
}
32+
}
33+
return ret
34+
}
35+
2336
fun ChannelPipeline.replace(oldHandler: ChannelHandler, newHandlers: List<Pair<String, ChannelHandler>>) {
2437
replace(oldHandler, newHandlers[0].first, newHandlers[0].second)
2538
for (i in 1 until newHandlers.size) {

libp2p/src/main/kotlin/io/libp2p/transport/implementation/PlainNettyTransport.kt

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import io.netty.bootstrap.ServerBootstrap
1919
import io.netty.channel.Channel
2020
import io.netty.channel.ChannelHandler
2121
import io.netty.channel.ChannelOption
22-
import io.netty.channel.nio.NioEventLoopGroup
22+
import io.netty.channel.MultiThreadIoEventLoopGroup
23+
import io.netty.channel.nio.NioIoHandler
2324
import io.netty.channel.socket.nio.NioServerSocketChannel
2425
import io.netty.channel.socket.nio.NioSocketChannel
2526
import java.net.InetSocketAddress
@@ -39,8 +40,12 @@ abstract class PlainNettyTransport(
3940
private val listeners = mutableMapOf<Multiaddr, Channel>()
4041
private val channels = mutableListOf<Channel>()
4142

42-
private var workerGroup by lazyVar { NioEventLoopGroup() }
43-
private var bossGroup by lazyVar { NioEventLoopGroup(1) }
43+
private var workerGroup by lazyVar {
44+
MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())
45+
}
46+
private var bossGroup by lazyVar {
47+
MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory())
48+
}
4449

4550
private var client by lazyVar {
4651
Bootstrap().apply {
@@ -85,10 +90,11 @@ abstract class PlainNettyTransport(
8590
val everythingThatNeedsToClose = unbindsCompleted.union(channelsClosed)
8691
val allClosed = CompletableFuture.allOf(*everythingThatNeedsToClose.toTypedArray())
8792

88-
return allClosed.thenApply {
89-
workerGroup.shutdownGracefully()
90-
bossGroup.shutdownGracefully()
91-
Unit
93+
return allClosed.thenCompose {
94+
CompletableFuture.allOf(
95+
workerGroup.shutdownGracefully().toVoidCompletableFuture(),
96+
bossGroup.shutdownGracefully().toVoidCompletableFuture()
97+
).thenApply { }
9298
}
9399
} // close
94100

libp2p/src/main/kotlin/io/libp2p/transport/quic/QuicTransport.kt

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ import io.libp2p.etc.STREAM
1919
import io.libp2p.etc.types.*
2020
import io.libp2p.etc.util.MultiaddrUtils
2121
import io.libp2p.etc.util.netty.nettyInitializer
22-
import io.libp2p.security.tls.*
22+
import io.libp2p.security.tls.Libp2pTrustManager
23+
import io.libp2p.security.tls.buildCert
24+
import io.libp2p.security.tls.getJavaKey
25+
import io.libp2p.security.tls.getPublicKeyFromCert
26+
import io.libp2p.security.tls.verifyAndExtractPeerId
2327
import io.libp2p.transport.implementation.ConnectionOverNetty
2428
import io.libp2p.transport.implementation.NettyTransport
2529
import io.libp2p.transport.implementation.StreamOverNetty
@@ -29,12 +33,13 @@ import io.netty.buffer.PooledByteBufAllocator
2933
import io.netty.channel.*
3034
import io.netty.channel.epoll.Epoll
3135
import io.netty.channel.epoll.EpollDatagramChannel
32-
import io.netty.channel.nio.NioEventLoopGroup
36+
import io.netty.channel.nio.NioIoHandler
3337
import io.netty.channel.socket.nio.NioDatagramChannel
38+
import io.netty.handler.codec.quic.*
3439
import io.netty.handler.ssl.ClientAuth
35-
import io.netty.incubator.codec.quic.*
3640
import org.slf4j.LoggerFactory
37-
import java.net.*
41+
import java.net.InetSocketAddress
42+
import java.net.SocketAddress
3843
import java.time.Duration
3944
import java.util.*
4045
import java.util.concurrent.CompletableFuture
@@ -45,16 +50,18 @@ class QuicTransport(
4550
private val certAlgorithm: String,
4651
private val protocols: List<ProtocolBinding<*>>
4752
) : NettyTransport {
48-
private val log = LoggerFactory.getLogger(QuicTransport::class.java)
53+
54+
private val logger = LoggerFactory.getLogger(QuicTransport::class.java)
4955

5056
private var closed = false
5157
var connectTimeout = Duration.ofSeconds(15)
5258

5359
private val listeners = mutableMapOf<Multiaddr, Channel>()
5460
private val channels = mutableListOf<Channel>()
5561

56-
private var workerGroup by lazyVar { NioEventLoopGroup() }
57-
private var bossGroup by lazyVar { NioEventLoopGroup(1) }
62+
private var workerGroup by lazyVar {
63+
MultiThreadIoEventLoopGroup(NioIoHandler.newFactory())
64+
}
5865
private var allocator by lazyVar { PooledByteBufAllocator(true) }
5966
private var multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1
6067
private var incomingMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
@@ -78,7 +85,7 @@ class QuicTransport(
7885
}
7986

8087
@JvmStatic
81-
fun Ecdsa(k: PrivKey, p: List<ProtocolBinding<*>>): QuicTransport {
88+
fun ECDSA(k: PrivKey, p: List<ProtocolBinding<*>>): QuicTransport {
8289
return QuicTransport(k, "ECDSA", p)
8390
}
8491

@@ -128,10 +135,8 @@ class QuicTransport(
128135
val everythingThatNeedsToClose = unbindsCompleted.union(channelsClosed)
129136
val allClosed = CompletableFuture.allOf(*everythingThatNeedsToClose.toTypedArray())
130137

131-
return allClosed.thenApply {
132-
workerGroup.shutdownGracefully()
133-
bossGroup.shutdownGracefully()
134-
Unit
138+
return allClosed.thenCompose {
139+
workerGroup.shutdownGracefully().toVoidCompletableFuture()
135140
}
136141
}
137142

@@ -164,7 +169,7 @@ class QuicTransport(
164169
listeners -= addr
165170
}
166171
}
167-
log.info("Quic server listening on {}", addr)
172+
logger.info("Quic server listening on {}", addr)
168173
res.complete(null)
169174
}
170175
}
@@ -208,7 +213,6 @@ class QuicTransport(
208213
.option(ChannelOption.AUTO_READ, true)
209214
.option(ChannelOption.ALLOCATOR, allocator)
210215
.remoteAddress(fromMultiaddr(addr))
211-
// .handler(connHandler)
212216
.streamHandler(object : ChannelInboundHandlerAdapter() {
213217
override fun handlerAdded(ctx: ChannelHandlerContext?) {
214218
val connection = ctx!!.channel().parent().attr(CONNECTION).get() as Connection
@@ -285,7 +289,7 @@ class QuicTransport(
285289
val javaPrivateKey = getJavaKey(connectionKeys.first)
286290
val isClient = expectedRemotePeerId != null
287291
val cert = buildCert(localKey, connectionKeys.first)
288-
log.info("Building {} keys and cert for peerid {}", certAlgorithm, PeerId.fromPubKey(localKey.publicKey()))
292+
logger.info("Building {} keys and cert for peer id {}", certAlgorithm, PeerId.fromPubKey(localKey.publicKey()))
289293
return (
290294
if (isClient) {
291295
QuicSslContextBuilder.forClient().keyManager(javaPrivateKey, null, cert)
@@ -326,7 +330,7 @@ class QuicTransport(
326330
val remotePeerId = verifyAndExtractPeerId(arrayOf(remoteCert))
327331
val remotePublicKey = getPublicKeyFromCert(arrayOf(remoteCert))
328332

329-
log.info("Handshake completed with remote peer id: {}", remotePeerId)
333+
logger.info("Handshake completed with remote peer id: {}", remotePeerId)
330334

331335
connection.setSecureSession(
332336
SecureChannel.Session(
@@ -354,7 +358,7 @@ class QuicTransport(
354358

355359
@Deprecated("Deprecated in Java")
356360
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
357-
log.error("An error during handshake", cause)
361+
logger.error("An error during handshake", cause)
358362
ctx.close()
359363
}
360364
}
@@ -374,8 +378,8 @@ class QuicTransport(
374378
val connection: ConnectionOverNetty
375379
) : StreamMuxer.Session {
376380
override fun <T> createStream(protocols: List<ProtocolBinding<T>>): StreamPromise<T> {
377-
var multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1
378-
var streamMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
381+
val multistreamProtocol: MultistreamProtocol = MultistreamProtocolV1
382+
val streamMultistreamProtocol: MultistreamProtocol by lazyVar { multistreamProtocol }
379383
val multi = streamMultistreamProtocol.createMultistream(protocols)
380384

381385
val controller = CompletableFuture<T>()

libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketClientInitializer.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ internal class WebSocketClientInitializer(
1212
private val url: String
1313
) : ChannelInitializer<SocketChannel>() {
1414

15-
public override fun initChannel(ch: SocketChannel) {
15+
override fun initChannel(ch: SocketChannel) {
1616
val pipeline = ch.pipeline()
1717

1818
pipeline.addLast(HttpClientCodec())
1919
pipeline.addLast(HttpObjectAggregator(65536))
20-
pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE)
20+
pipeline.addLast(WebSocketClientCompressionHandler(0))
2121
pipeline.addLast(
2222
WebSocketClientHandshake(
2323
connectionBuilder,

libp2p/src/main/kotlin/io/libp2p/transport/ws/WebSocketServerInitializer.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ internal class WebSocketServerInitializer(
1212
private val connectionBuilder: ChannelHandler
1313
) : ChannelInitializer<SocketChannel>() {
1414

15-
public override fun initChannel(ch: SocketChannel) {
15+
override fun initChannel(ch: SocketChannel) {
1616
val pipeline = ch.pipeline()
1717

1818
pipeline.addLast(HttpServerCodec())
1919
pipeline.addLast(HttpObjectAggregator(65536))
20-
pipeline.addLast(WebSocketServerCompressionHandler())
20+
pipeline.addLast(WebSocketServerCompressionHandler(0))
2121
pipeline.addLast(WebSocketServerProtocolHandler("/", null, true))
2222
pipeline.addLast(WebSocketServerHandshakeListener(connectionBuilder))
2323
} // initChannel

libp2p/src/test/java/io/libp2p/transport/quic/QuicKuboTestJava.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ void pingKubo() throws Exception {
2121
PeerId peerId = PeerId.fromBase58(getKuboPeerId());
2222

2323
Host clientHost =
24-
new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::Ecdsa).build();
24+
new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::ECDSA).build();
2525

2626
CompletableFuture<Void> clientStarted = clientHost.start();
2727
clientStarted.get(5, TimeUnit.SECONDS);

libp2p/src/test/java/io/libp2p/transport/quic/QuicServerTestJava.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ void pingJava() throws Exception {
3232
Host clientHost =
3333
new HostBuilder()
3434
.keyType(KeyType.ED25519)
35-
.secureTransport(QuicTransport::Ecdsa)
35+
.secureTransport(QuicTransport::ECDSA)
3636
.transport(TcpTransport::new)
3737
.secureChannel(TlsSecureChannel::ECDSA)
3838
.muxer(StreamMuxerProtocol::getYamux)
@@ -41,7 +41,7 @@ void pingJava() throws Exception {
4141
Host serverHost =
4242
new HostBuilder()
4343
.keyType(KeyType.ED25519)
44-
.secureTransport(QuicTransport::Ecdsa)
44+
.secureTransport(QuicTransport::ECDSA)
4545
.transport(TcpTransport::new)
4646
.secureChannel(TlsSecureChannel::ECDSA)
4747
.muxer(StreamMuxerProtocol::getYamux)
@@ -105,7 +105,7 @@ void tlsAndQuicInSameHostPing() throws Exception {
105105
Host clientHost =
106106
new HostBuilder()
107107
.keyType(KeyType.ED25519)
108-
.secureTransport(QuicTransport::Ecdsa)
108+
.secureTransport(QuicTransport::ECDSA)
109109
.transport(TcpTransport::new)
110110
.secureChannel(TlsSecureChannel::ECDSA)
111111
.secureChannel(NoiseXXSecureChannel::new)
@@ -115,7 +115,7 @@ void tlsAndQuicInSameHostPing() throws Exception {
115115
Host serverHost =
116116
new HostBuilder()
117117
.keyType(KeyType.ED25519)
118-
.secureTransport(QuicTransport::Ecdsa)
118+
.secureTransport(QuicTransport::ECDSA)
119119
.transport(TcpTransport::new)
120120
.secureChannel(TlsSecureChannel::ECDSA)
121121
.secureChannel(NoiseXXSecureChannel::new)
@@ -201,15 +201,15 @@ void largeBlob() throws Exception {
201201
Host clientHost =
202202
new HostBuilder()
203203
.keyType(KeyType.ED25519)
204-
.secureTransport(QuicTransport::Ecdsa)
204+
.secureTransport(QuicTransport::ECDSA)
205205
.builderModifier(
206206
b -> b.getDebug().getMuxFramesHandler().addCompactLogger(LogLevel.ERROR, "client"))
207207
.build();
208208

209209
Host serverHost =
210210
new HostBuilder()
211211
.keyType(KeyType.ED25519)
212-
.secureTransport(QuicTransport::Ecdsa)
212+
.secureTransport(QuicTransport::ECDSA)
213213
.protocol(new Blob(blobSize))
214214
.listen(localListenAddress)
215215
.builderModifier(
@@ -262,12 +262,12 @@ void startHostAddPing() throws Exception {
262262
String localListenAddress = "/ip4/127.0.0.1/udp/" + getPort() + "/quic-v1";
263263

264264
Host clientHost =
265-
new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::Ecdsa).build();
265+
new HostBuilder().keyType(KeyType.ED25519).secureTransport(QuicTransport::ECDSA).build();
266266

267267
Host serverHost =
268268
new HostBuilder()
269269
.keyType(KeyType.ED25519)
270-
.secureTransport(QuicTransport::Ecdsa)
270+
.secureTransport(QuicTransport::ECDSA)
271271
.listen(localListenAddress)
272272
.build();
273273

0 commit comments

Comments
 (0)