Skip to content

Commit e4c25d5

Browse files
Add interop test support (#421)
1 parent 8b62e1f commit e4c25d5

File tree

7 files changed

+395
-1
lines changed

7 files changed

+395
-1
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,6 @@ $RECYCLE.BIN/
190190
node_modules
191191
package-lock.json
192192
/src/jmh/java/generated/
193+
194+
#Jenv
195+
.java-version

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ configure(
187187
}
188188

189189
detekt {
190-
config = files("$rootDir/detekt/config.yml")
190+
config.from("$rootDir/detekt/config.yml")
191191
buildUponDefaultConfig = true
192192
}
193193
}

interop-test-client/build.gradle.kts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
plugins {
2+
id("java")
3+
id("com.bmuschko.docker-java-application") version "9.4.0"
4+
}
5+
6+
dependencies {
7+
implementation(project(":libp2p"))
8+
implementation("redis.clients:jedis:6.1.0")
9+
runtimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl")
10+
}
11+
12+
docker {
13+
javaApplication {
14+
baseImage.set("openjdk:11-jdk")
15+
ports.set(listOf(4041))
16+
}
17+
}
18+
19+
val composeFileSpec: CopySpec = copySpec {
20+
from("src/test/resources")
21+
include("compose.yaml")
22+
}
23+
24+
val copyAssets = tasks.register<Copy>("copyAssets") {
25+
into(layout.buildDirectory.dir("docker"))
26+
with(composeFileSpec)
27+
}
28+
29+
tasks.dockerCreateDockerfile {
30+
dependsOn(copyAssets)
31+
}
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
package io.libp2p.interop
2+
3+
import identify.pb.IdentifyOuterClass
4+
import io.libp2p.core.Connection
5+
import io.libp2p.core.ConnectionHandler
6+
import io.libp2p.core.Host
7+
import io.libp2p.core.PeerId.Companion.fromPubKey
8+
import io.libp2p.core.crypto.PrivKey
9+
import io.libp2p.core.dsl.Builder
10+
import io.libp2p.core.dsl.hostJ
11+
import io.libp2p.core.multiformats.Multiaddr
12+
import io.libp2p.core.multistream.ProtocolBinding
13+
import io.libp2p.core.mux.StreamMuxerProtocol
14+
import io.libp2p.core.mux.StreamMuxerProtocol.Companion.Mplex
15+
import io.libp2p.core.mux.StreamMuxerProtocol.Companion.getYamux
16+
import io.libp2p.crypto.keys.generateEd25519KeyPair
17+
import io.libp2p.etc.types.toProtobuf
18+
import io.libp2p.protocol.Identify
19+
import io.libp2p.protocol.Ping
20+
import io.libp2p.security.noise.NoiseXXSecureChannel
21+
import io.libp2p.security.tls.TlsSecureChannel.Companion.ECDSA
22+
import io.libp2p.transport.tcp.TcpTransport
23+
import redis.clients.jedis.Jedis
24+
import java.util.concurrent.CompletableFuture
25+
import java.util.concurrent.TimeUnit
26+
import java.util.stream.Collectors
27+
import kotlin.random.Random
28+
import kotlin.system.exitProcess
29+
30+
private const val REDIS_KEY_LISTENER_ADDRESS = "listenerAddr"
31+
32+
class InteropTestAgent(val params: InteropTestParams) {
33+
34+
private val advertisedAddress: Multiaddr
35+
private val node: Host
36+
37+
init {
38+
val port = 10000 + Random.nextInt(50000)
39+
val isTcp = "tcp" == params.transport
40+
val ip = params.ip
41+
val protocol = if (isTcp) "tcp" else "udp"
42+
val maybeQuicSuffix = (if (isTcp) "" else "/quic-v1")
43+
val address =
44+
Multiaddr.fromString("/ip4/$ip/$protocol/${port}$maybeQuicSuffix")
45+
46+
val privateKey = generateEd25519KeyPair().first
47+
val peerID = fromPubKey(privateKey.publicKey())
48+
advertisedAddress = address.withP2P(peerID)
49+
50+
val listenAddresses = ArrayList<String>()
51+
listenAddresses.add(address.toString())
52+
val protocols = createProtocols(privateKey, listenAddresses)
53+
node = createHost(privateKey, protocols, listenAddresses)
54+
}
55+
56+
fun run(): CompletableFuture<Void> {
57+
return node.start()
58+
.thenCompose { startJedisConnection() }
59+
.thenCompose { jedis ->
60+
if (params.isDialer) {
61+
startDialer(jedis, node)
62+
} else {
63+
startListener(jedis, advertisedAddress)
64+
}
65+
}.whenComplete { _, _ -> node.stop() }
66+
}
67+
68+
private fun createHost(
69+
privateKey: PrivKey,
70+
protocols: ArrayList<ProtocolBinding<Any>>,
71+
listenAddresses: ArrayList<String>
72+
): Host = hostJ(Builder.Defaults.None, fn = {
73+
it.identity.factory = { privateKey }
74+
if (params.transport == "quic-v1") {
75+
// TODO add quic support
76+
} else {
77+
it.transports.add(::TcpTransport)
78+
}
79+
80+
if ("noise" == params.security) {
81+
it.secureChannels.add(::NoiseXXSecureChannel)
82+
} else if ("tls" == params.security) {
83+
it.secureChannels.add(::ECDSA)
84+
}
85+
86+
val muxers = ArrayList<StreamMuxerProtocol>()
87+
if ("mplex" == params.muxer) {
88+
muxers.add(Mplex)
89+
} else if ("yamux" == params.muxer) {
90+
muxers.add(getYamux())
91+
}
92+
it.muxers.addAll(muxers)
93+
94+
for (protocol in protocols) {
95+
it.protocols.add(protocol)
96+
}
97+
98+
for (listenAddr in listenAddresses) {
99+
it.network.listen(listenAddr)
100+
}
101+
102+
it.connectionHandlers.add {
103+
ConnectionHandler { conn: Connection ->
104+
printDiagnosticsLog(
105+
(
106+
conn.localAddress()
107+
.toString() + " received connection from " +
108+
conn.remoteAddress() +
109+
" on transport " +
110+
conn.transport()
111+
)
112+
)
113+
}
114+
}
115+
})
116+
117+
private fun startJedisConnection(): CompletableFuture<Jedis> {
118+
return CompletableFuture.supplyAsync {
119+
val jedis = Jedis("http://${params.redisAddress}")
120+
var isReady = false
121+
while (!isReady) {
122+
if ("PONG" == jedis.ping()) {
123+
isReady = true
124+
} else {
125+
printDiagnosticsLog("waiting for redis to start...")
126+
Thread.sleep(1000)
127+
}
128+
}
129+
printDiagnosticsLog("Connection established to Redis ($jedis)")
130+
jedis
131+
}
132+
}
133+
134+
/*
135+
Start dialer and try to connect with a listener
136+
*/
137+
private fun startDialer(jedis: Jedis, node: Host): CompletableFuture<Void> {
138+
return CompletableFuture.supplyAsync {
139+
printDiagnosticsLog("Starting dialer")
140+
141+
val listenerAddresses =
142+
jedis.blpop(params.testTimeoutInSeconds, REDIS_KEY_LISTENER_ADDRESS)
143+
if (listenerAddresses == null || listenerAddresses.isEmpty()) {
144+
throw IllegalStateException("listenerAddr not set")
145+
}
146+
147+
val listenerAddr =
148+
Multiaddr.fromString(listenerAddresses.first { s -> s.startsWith("/") })
149+
150+
printDiagnosticsLog("Sending ping messages to $listenerAddr")
151+
152+
val handshakeStart = System.currentTimeMillis()
153+
154+
val pingController = Ping().dial(node, listenerAddr).controller.join()
155+
val pingRTTMillis = pingController.ping().join()
156+
val handshakeEnd = System.currentTimeMillis()
157+
val handshakePlusOneRTT = handshakeEnd - handshakeStart
158+
159+
printDiagnosticsLog("Ping latency $pingRTTMillis ms")
160+
161+
val jsonResult =
162+
"{\"handshakePlusOneRTTMillis\":${handshakePlusOneRTT.toDouble()}, \"pingRTTMilllis\": ${pingRTTMillis.toDouble()}}"
163+
164+
emitResult(jsonResult)
165+
null
166+
}
167+
}
168+
169+
/*
170+
Start listener and wait up to testTimeoutInSeconds for a message from dialer
171+
*/
172+
private fun startListener(
173+
jedis: Jedis,
174+
advertisedAddress: Multiaddr
175+
): CompletableFuture<Void> {
176+
return CompletableFuture.supplyAsync {
177+
println("Starting listener with advertisedAddress: $advertisedAddress")
178+
179+
jedis.rpush(REDIS_KEY_LISTENER_ADDRESS, advertisedAddress.toString())
180+
181+
// Wait for dialer
182+
Thread.sleep(params.testTimeoutInSeconds.toLong() * 1000L)
183+
null
184+
}
185+
}
186+
187+
private fun createProtocols(
188+
privateKey: PrivKey,
189+
listenAddresses: ArrayList<String>
190+
): ArrayList<ProtocolBinding<Any>> {
191+
var identifyBuilder =
192+
IdentifyOuterClass.Identify.newBuilder()
193+
.setProtocolVersion("ipfs/0.1.0")
194+
.setAgentVersion("jvm-libp2p/v1.0.0")
195+
.setPublicKey(privateKey.publicKey().bytes().toProtobuf())
196+
.addAllListenAddrs(
197+
listenAddresses.stream()
198+
.map(Multiaddr::fromString)
199+
.map(Multiaddr::serialize)
200+
.map(ByteArray::toProtobuf)
201+
.collect(Collectors.toList())
202+
)
203+
204+
val protocols = ArrayList<ProtocolBinding<Any>>()
205+
protocols.add(Ping())
206+
for (protocol in protocols) {
207+
identifyBuilder =
208+
identifyBuilder.addAllProtocols(protocol.protocolDescriptor.announceProtocols)
209+
}
210+
protocols.add(Identify(identifyBuilder.build()))
211+
212+
return protocols
213+
}
214+
}
215+
216+
private fun emitResult(json: String) {
217+
println(json)
218+
}
219+
220+
private fun printDiagnosticsLog(msg: String) {
221+
System.err.println(msg)
222+
}
223+
224+
@SuppressWarnings("unused")
225+
fun main() {
226+
try {
227+
val params = InteropTestParams.Builder().fromEnvironmentVariables().build()
228+
229+
InteropTestAgent(params).run()
230+
.orTimeout(params.testTimeoutInSeconds.toLong(), TimeUnit.SECONDS)
231+
.join()
232+
} catch (e: Exception) {
233+
printDiagnosticsLog("Unexpected exit: $e")
234+
exitProcess(-1)
235+
}
236+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.libp2p.interop
2+
3+
import java.net.Inet6Address
4+
import java.net.NetworkInterface
5+
import java.util.stream.Collectors
6+
7+
class InteropTestParams(
8+
val transport: String?,
9+
val muxer: String?,
10+
val security: String?,
11+
val isDialer: Boolean,
12+
val ip: String?,
13+
val redisAddress: String?,
14+
val testTimeoutInSeconds: Int
15+
) {
16+
17+
data class Builder(
18+
var transport: String? = "",
19+
var muxer: String? = "",
20+
var security: String? = "",
21+
var isDialer: Boolean = false,
22+
var ip: String? = "",
23+
var redisAddress: String? = "",
24+
var testTimeoutInSeconds: Int = 180
25+
) {
26+
fun transport(transport: String) = apply { this.transport = transport }
27+
fun muxer(muxer: String) = apply { this.muxer = muxer }
28+
fun security(security: String) = apply { this.security = security }
29+
fun isDialer(isDialer: Boolean) = apply { this.isDialer = isDialer }
30+
fun ip(ip: String) = apply { this.ip = ip }
31+
fun redisAddress(redisAddress: String) = apply { this.redisAddress = redisAddress }
32+
fun testTimeoutInSeconds(testTimeoutInSeconds: Int) =
33+
apply { this.testTimeoutInSeconds = testTimeoutInSeconds }
34+
35+
fun build(): InteropTestParams {
36+
checkNonEmptyParam("transport", transport)
37+
checkNonEmptyParam("muxer", muxer)
38+
checkNonEmptyParam("security", security)
39+
checkNonEmptyParam("redis_addr", security)
40+
41+
if (ip == null || ip!!.isBlank()) {
42+
ip = "0.0.0.0"
43+
}
44+
if (!isDialer && ip.equals("0.0.0.0")) {
45+
ip = getLocalIPAddress()
46+
}
47+
48+
return InteropTestParams(
49+
transport,
50+
muxer,
51+
security,
52+
isDialer,
53+
ip,
54+
redisAddress,
55+
testTimeoutInSeconds
56+
)
57+
}
58+
59+
private fun checkNonEmptyParam(paramName: String, paramValue: String?) {
60+
if (paramValue == null) {
61+
throw IllegalArgumentException("Parameter '$paramName' must be non-empty")
62+
}
63+
}
64+
65+
fun fromEnvironmentVariables(): Builder {
66+
return Builder(
67+
transport = System.getenv("transport"),
68+
muxer = System.getenv("muxer"),
69+
security = System.getenv("security"),
70+
isDialer = System.getenv("is_dialer")?.toBooleanStrictOrNull() ?: false,
71+
ip = System.getenv("ip"),
72+
redisAddress = System.getenv("redis_addr"),
73+
testTimeoutInSeconds = System.getenv("test_timeout_seconds")?.toInt() ?: 180
74+
)
75+
}
76+
77+
private fun getLocalIPAddress(): String {
78+
val interfaces =
79+
NetworkInterface.networkInterfaces().collect(Collectors.toList())
80+
for (inter in interfaces) {
81+
for (addr in inter.interfaceAddresses) {
82+
val address = addr.address
83+
if (!address.isLoopbackAddress && address !is Inet6Address) return address.hostAddress
84+
}
85+
}
86+
throw IllegalStateException("Unable to determine local IPAddress")
87+
}
88+
}
89+
90+
override fun toString(): String {
91+
return "InteropTestParams(transport=$transport, muxer=$muxer, security=$security, isDialer=$isDialer, ip=$ip, redisAddress=$redisAddress, testTimeoutInSeconds=$testTimeoutInSeconds)"
92+
}
93+
}

0 commit comments

Comments
 (0)