Skip to content

Commit e7ae1b6

Browse files
authored
Follow up to #407: refactor NettyTransport (#408)
* Relay local/remote multiaddress retrieval to Transport implementations to get rid of subclass checks
1 parent 183061b commit e7ae1b6

File tree

10 files changed

+323
-300
lines changed

10 files changed

+323
-300
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.libp2p.etc.util
2+
3+
import io.libp2p.core.InternalErrorException
4+
import io.libp2p.core.multiformats.Multiaddr
5+
import io.libp2p.core.multiformats.Protocol
6+
import java.net.*
7+
8+
class MultiaddrUtils {
9+
10+
companion object {
11+
12+
fun inetAddressToIpMultiaddr(addr: InetAddress): Multiaddr {
13+
val proto = when (addr) {
14+
is Inet4Address -> Protocol.IP4
15+
is Inet6Address -> Protocol.IP6
16+
else -> throw InternalErrorException("Unknown address type $addr")
17+
}
18+
return Multiaddr.empty()
19+
.withComponent(proto, addr.hostAddress)
20+
}
21+
22+
fun inetSocketAddressToTcpMultiaddr(addr: InetSocketAddress): Multiaddr =
23+
inetAddressToIpMultiaddr(addr.address)
24+
.withComponent(Protocol.TCP, addr.port.toString())
25+
26+
fun inetSocketAddressToUdpMultiaddr(addr: InetSocketAddress): Multiaddr =
27+
inetAddressToIpMultiaddr(addr.address)
28+
.withComponent(Protocol.UDP, addr.port.toString())
29+
}
30+
}

libp2p/src/main/kotlin/io/libp2p/transport/implementation/ConnectionBuilder.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import io.libp2p.core.Connection
55
import io.libp2p.core.ConnectionHandler
66
import io.libp2p.core.P2PChannel
77
import io.libp2p.core.PeerId
8-
import io.libp2p.core.transport.Transport
98
import io.libp2p.etc.REMOTE_PEER_ID
109
import io.libp2p.etc.types.forward
1110
import io.libp2p.transport.ConnectionUpgrader
@@ -14,7 +13,7 @@ import io.netty.channel.ChannelInitializer
1413
import java.util.concurrent.CompletableFuture
1514

1615
class ConnectionBuilder(
17-
private val transport: Transport,
16+
private val transport: NettyTransport,
1817
private val upgrader: ConnectionUpgrader,
1918
private val connHandler: ConnectionHandler,
2019
private val initiator: Boolean,
Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,11 @@
11
package io.libp2p.transport.implementation
22

33
import io.libp2p.core.Connection
4-
import io.libp2p.core.InternalErrorException
54
import io.libp2p.core.multiformats.Multiaddr
6-
import io.libp2p.core.multiformats.Protocol
75
import io.libp2p.core.mux.StreamMuxer
86
import io.libp2p.core.security.SecureChannel
9-
import io.libp2p.core.transport.Transport
107
import io.libp2p.etc.CONNECTION
11-
import io.libp2p.transport.quic.QuicTransport
128
import io.netty.channel.Channel
13-
import io.netty.incubator.codec.quic.QuicChannel
14-
import java.net.Inet4Address
15-
import java.net.Inet6Address
16-
import java.net.InetSocketAddress
179

1810
/**
1911
* A Connection is a high-level wrapper around a Netty Channel representing the conduit to a peer.
@@ -22,7 +14,7 @@ import java.net.InetSocketAddress
2214
*/
2315
open class ConnectionOverNetty(
2416
ch: Channel,
25-
private val transport: Transport,
17+
private val nettyTransport: NettyTransport,
2618
initiator: Boolean
2719
) : Connection, P2PChannelOverNetty(ch, initiator) {
2820
private lateinit var muxerSession: StreamMuxer.Session
@@ -41,41 +33,8 @@ open class ConnectionOverNetty(
4133

4234
override fun muxerSession() = muxerSession
4335
override fun secureSession() = secureSession
44-
override fun transport() = transport
36+
override fun transport() = nettyTransport
4537

46-
override fun localAddress(): Multiaddr {
47-
if (nettyChannel is QuicChannel) {
48-
return toMultiaddr(nettyChannel.localSocketAddress() as InetSocketAddress)
49-
} else {
50-
return toMultiaddr(nettyChannel.localAddress() as InetSocketAddress)
51-
}
52-
}
53-
override fun remoteAddress(): Multiaddr {
54-
if (nettyChannel is QuicChannel) {
55-
return toMultiaddr(nettyChannel.remoteSocketAddress() as InetSocketAddress)
56-
} else {
57-
return toMultiaddr(nettyChannel.remoteAddress() as InetSocketAddress)
58-
}
59-
}
60-
61-
private fun toMultiaddr(addr: InetSocketAddress): Multiaddr {
62-
if (transport is NettyTransport) {
63-
return transport.toMultiaddr(addr)
64-
} else if (transport is QuicTransport) {
65-
return transport.toMultiaddr(addr)
66-
} else {
67-
return toMultiaddrDefault(addr)
68-
}
69-
}
70-
71-
fun toMultiaddrDefault(addr: InetSocketAddress): Multiaddr {
72-
val proto = when (addr.address) {
73-
is Inet4Address -> Protocol.IP4
74-
is Inet6Address -> Protocol.IP6
75-
else -> throw InternalErrorException("Unknown address type $addr")
76-
}
77-
return Multiaddr.empty()
78-
.withComponent(proto, addr.address.hostAddress)
79-
.withComponent(Protocol.TCP, addr.port.toString())
80-
} // toMultiaddr
38+
override fun localAddress(): Multiaddr = nettyTransport.localAddress(nettyChannel)
39+
override fun remoteAddress(): Multiaddr = nettyTransport.remoteAddress(nettyChannel)
8140
}
Lines changed: 7 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -1,208 +1,15 @@
11
package io.libp2p.transport.implementation
22

3-
import io.libp2p.core.ChannelVisitor
4-
import io.libp2p.core.Connection
5-
import io.libp2p.core.ConnectionHandler
6-
import io.libp2p.core.Libp2pException
7-
import io.libp2p.core.P2PChannel
8-
import io.libp2p.core.PeerId
93
import io.libp2p.core.multiformats.Multiaddr
10-
import io.libp2p.core.multiformats.MultiaddrDns
11-
import io.libp2p.core.multiformats.Protocol
124
import io.libp2p.core.transport.Transport
13-
import io.libp2p.etc.types.lazyVar
14-
import io.libp2p.etc.types.toCompletableFuture
15-
import io.libp2p.etc.types.toVoidCompletableFuture
16-
import io.libp2p.etc.util.netty.nettyInitializer
17-
import io.libp2p.transport.ConnectionUpgrader
18-
import io.netty.bootstrap.Bootstrap
19-
import io.netty.bootstrap.ServerBootstrap
205
import io.netty.channel.Channel
21-
import io.netty.channel.ChannelHandler
22-
import io.netty.channel.ChannelOption
23-
import io.netty.channel.nio.NioEventLoopGroup
24-
import io.netty.channel.socket.nio.NioServerSocketChannel
25-
import io.netty.channel.socket.nio.NioSocketChannel
26-
import java.net.InetSocketAddress
27-
import java.time.Duration
28-
import java.util.concurrent.CompletableFuture
296

30-
abstract class NettyTransport(
31-
private val upgrader: ConnectionUpgrader
32-
) : Transport {
33-
private var closed = false
34-
var connectTimeout = Duration.ofSeconds(15)
7+
/**
8+
* A `Transport` which relies on a Netty `Channel`
9+
*/
10+
interface NettyTransport : Transport {
3511

36-
private val listeners = mutableMapOf<Multiaddr, Channel>()
37-
private val channels = mutableListOf<Channel>()
12+
fun localAddress(nettyChannel: Channel): Multiaddr
3813

39-
private var workerGroup by lazyVar { NioEventLoopGroup() }
40-
private var bossGroup by lazyVar { NioEventLoopGroup(1) }
41-
42-
private var client by lazyVar {
43-
Bootstrap().apply {
44-
group(workerGroup)
45-
channel(NioSocketChannel::class.java)
46-
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout.toMillis().toInt())
47-
}
48-
}
49-
50-
private var server by lazyVar {
51-
ServerBootstrap().apply {
52-
group(bossGroup, workerGroup)
53-
channel(NioServerSocketChannel::class.java)
54-
}
55-
}
56-
57-
override val activeListeners: Int
58-
get() = listeners.size
59-
override val activeConnections: Int
60-
get() = channels.size
61-
62-
override fun listenAddresses(): List<Multiaddr> {
63-
return listeners.values.map {
64-
toMultiaddr(it.localAddress() as InetSocketAddress)
65-
}
66-
}
67-
68-
override fun initialize() {
69-
}
70-
71-
override fun close(): CompletableFuture<Unit> {
72-
closed = true
73-
74-
val unbindsCompleted = listeners
75-
.map { (_, ch) -> ch }
76-
.map { it.close().toVoidCompletableFuture() }
77-
78-
val channelsClosed = channels
79-
.toMutableList() // need a copy to avoid potential co-modification problems
80-
.map { it.close().toVoidCompletableFuture() }
81-
82-
val everythingThatNeedsToClose = unbindsCompleted.union(channelsClosed)
83-
val allClosed = CompletableFuture.allOf(*everythingThatNeedsToClose.toTypedArray())
84-
85-
return allClosed.thenApply {
86-
workerGroup.shutdownGracefully()
87-
bossGroup.shutdownGracefully()
88-
Unit
89-
}
90-
} // close
91-
92-
override fun listen(addr: Multiaddr, connHandler: ConnectionHandler, preHandler: ChannelVisitor<P2PChannel>?): CompletableFuture<Unit> {
93-
if (closed) throw Libp2pException("Transport is closed")
94-
95-
val connectionBuilder = makeConnectionBuilder(connHandler, false, preHandler = preHandler)
96-
val channelHandler = serverTransportBuilder(connectionBuilder, addr) ?: connectionBuilder
97-
98-
val listener = server.clone()
99-
.childHandler(
100-
nettyInitializer { init ->
101-
registerChannel(init.channel)
102-
init.addLastLocal(channelHandler)
103-
}
104-
)
105-
106-
val bindComplete = listener.bind(fromMultiaddr(addr))
107-
108-
bindComplete.also {
109-
synchronized(this@NettyTransport) {
110-
listeners += addr to it.channel()
111-
it.channel().closeFuture().addListener {
112-
synchronized(this@NettyTransport) {
113-
listeners -= addr
114-
}
115-
}
116-
}
117-
}
118-
119-
return bindComplete.toVoidCompletableFuture()
120-
} // listener
121-
122-
protected abstract fun serverTransportBuilder(
123-
connectionBuilder: ConnectionBuilder,
124-
addr: Multiaddr
125-
): ChannelHandler?
126-
127-
override fun unlisten(addr: Multiaddr): CompletableFuture<Unit> {
128-
return listeners[addr]?.close()?.toVoidCompletableFuture()
129-
?: throw Libp2pException("No listeners on address $addr")
130-
} // unlisten
131-
132-
override fun dial(addr: Multiaddr, connHandler: ConnectionHandler, preHandler: ChannelVisitor<P2PChannel>?): CompletableFuture<Connection> {
133-
if (closed) throw Libp2pException("Transport is closed")
134-
135-
val remotePeerId = addr.getPeerId()
136-
val connectionBuilder = makeConnectionBuilder(connHandler, true, remotePeerId, preHandler)
137-
val channelHandler = clientTransportBuilder(connectionBuilder, addr) ?: connectionBuilder
138-
139-
val chanFuture = client.clone()
140-
.handler(channelHandler)
141-
.connect(fromMultiaddr(addr))
142-
.also { registerChannel(it.channel()) }
143-
144-
return chanFuture.toCompletableFuture()
145-
.thenCompose { connectionBuilder.connectionEstablished }
146-
} // dial
147-
148-
protected abstract fun clientTransportBuilder(
149-
connectionBuilder: ConnectionBuilder,
150-
addr: Multiaddr
151-
): ChannelHandler?
152-
153-
private fun registerChannel(ch: Channel) {
154-
if (closed) {
155-
ch.close()
156-
return
157-
}
158-
159-
synchronized(this@NettyTransport) {
160-
channels += ch
161-
ch.closeFuture().addListener {
162-
synchronized(this@NettyTransport) {
163-
channels -= ch
164-
}
165-
}
166-
}
167-
} // registerChannel
168-
169-
private fun makeConnectionBuilder(
170-
connHandler: ConnectionHandler,
171-
initiator: Boolean,
172-
remotePeerId: PeerId? = null,
173-
preHandler: ChannelVisitor<P2PChannel>?
174-
) = ConnectionBuilder(
175-
this,
176-
upgrader,
177-
connHandler,
178-
initiator,
179-
remotePeerId,
180-
preHandler
181-
)
182-
183-
protected fun handlesHost(addr: Multiaddr) =
184-
addr.hasAny(Protocol.IP4, Protocol.IP6, Protocol.DNS4, Protocol.DNS6, Protocol.DNSADDR)
185-
186-
protected fun hostFromMultiaddr(addr: Multiaddr): String {
187-
val resolvedAddresses = MultiaddrDns.resolve(addr)
188-
if (resolvedAddresses.isEmpty()) {
189-
throw Libp2pException("Could not resolve $addr to an IP address")
190-
}
191-
192-
return resolvedAddresses[0].components.find {
193-
it.protocol in arrayOf(Protocol.IP4, Protocol.IP6)
194-
}?.stringValue ?: throw Libp2pException("Missing IP4/IP6 in multiaddress $addr")
195-
}
196-
197-
protected fun portFromMultiaddr(addr: Multiaddr) =
198-
addr.components.find { p -> p.protocol == Protocol.TCP }
199-
?.stringValue?.toInt() ?: throw Libp2pException("Missing TCP in multiaddress $addr")
200-
201-
private fun fromMultiaddr(addr: Multiaddr): InetSocketAddress {
202-
val host = hostFromMultiaddr(addr)
203-
val port = portFromMultiaddr(addr)
204-
return InetSocketAddress(host, port)
205-
} // fromMultiaddr
206-
207-
abstract fun toMultiaddr(addr: InetSocketAddress): Multiaddr
208-
} // class NettyTransportBase
14+
fun remoteAddress(nettyChannel: Channel): Multiaddr
15+
}

0 commit comments

Comments
 (0)