Skip to content

Commit c015e69

Browse files
committed
WIP
1 parent 410675a commit c015e69

File tree

11 files changed

+439
-0
lines changed

11 files changed

+439
-0
lines changed

build.sbt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import Dependencies.*
2+
import com.typesafe.sbt.packager.docker.{Cmd, ExecCmd}
23
import sbt.*
34

45
ThisBuild / organization := "com.evolution.jgrpc.tools"
@@ -55,6 +56,7 @@ lazy val root = project.in(file("."))
5556
)
5657
.aggregate(
5758
k8sDnsNameResolver,
59+
k8sDnsNameResolverIt,
5860
)
5961

6062
lazy val k8sDnsNameResolver = project.in(file("k8s-dns-name-resolver"))
@@ -68,6 +70,40 @@ lazy val k8sDnsNameResolver = project.in(file("k8s-dns-name-resolver"))
6870
),
6971
)
7072

73+
// TODO: WIP
74+
lazy val k8sDnsNameResolverIt = project.in(file("k8s-dns-name-resolver-it"))
75+
.enablePlugins(JavaAppPackaging, DockerPlugin)
76+
.settings(
77+
name := "k8s-dns-name-resolver-it",
78+
description := "Evolution grpc-java tools - DNS-based name resolver for Kubernetes services - integration tests",
79+
publish / skip := true,
80+
autoScalaLibrary := true,
81+
Compile / PB.targets := Seq(
82+
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb",
83+
),
84+
dockerBaseImage := "amazoncorretto:17-alpine",
85+
dockerCommands ++= Seq(
86+
// TODO
87+
Cmd("USER", "root"),
88+
ExecCmd("RUN", "apk", "add", "--no-cache", "bash"),
89+
ExecCmd("RUN", "apk", "add", "--no-cache", "coredns", "bind-tools"),
90+
),
91+
dockerExposedPorts ++= Seq(9000),
92+
test := {
93+
(Docker / stage).value
94+
(Test / test).value
95+
},
96+
libraryDependencies ++= Seq(
97+
"io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
98+
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
99+
Slf4j.simple,
100+
"org.testcontainers" % "testcontainers" % "2.0.2" % Test,
101+
"org.testcontainers" % "testcontainers-junit-jupiter" % "2.0.2" % Test,
102+
),
103+
).dependsOn(
104+
k8sDnsNameResolver,
105+
)
106+
71107
addCommandAlias("fmt", "all scalafmtAll scalafmtSbt javafmtAll")
72108
addCommandAlias(
73109
"build",
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
syntax = "proto2";
2+
3+
package k8sdns.it;
4+
5+
service TestSvc {
6+
rpc GetId (GetIdRequest) returns (GetIdReply) {}
7+
}
8+
9+
message GetIdRequest {}
10+
11+
message GetIdReply {
12+
required int32 id = 1;
13+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.evolution.jgrpc.tools.k8sdns.it
2+
3+
import k8sdns.it.test_svc._
4+
5+
// TODO: WIP clean up and refactor code
6+
// TODO: WIP set up scalac options
7+
8+
object ItApp extends App {
9+
private val runModeEnvVarName = "TEST_SVC_RUN_MODE"
10+
private val instanceIdVarName = "TEST_SVC_INSTANCE_ID"
11+
12+
sys.env.get(runModeEnvVarName) match {
13+
case None =>
14+
sys.error(s"missing environment variable: $runModeEnvVarName")
15+
case Some("server") =>
16+
runServer()
17+
case Some("client") =>
18+
runClient()
19+
case Some(unexpectedRunMode) =>
20+
sys.error(s"unexpected run mode: $unexpectedRunMode")
21+
}
22+
23+
private def runServer(): Unit = {
24+
val instanceId = sys.env.getOrElse(
25+
instanceIdVarName,
26+
sys.error(s"missing environment variable: $instanceIdVarName"),
27+
).toInt
28+
29+
new TestServer(instanceId = instanceId).run()
30+
}
31+
32+
private def runClient(): Unit = {
33+
new TestClient().run()
34+
}
35+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.evolution.jgrpc.tools.k8sdns.it
2+
3+
import java.nio.file.{Path, Paths}
4+
5+
object K8sDnsItAppShared {
6+
val ServerPort: Int = 9000
7+
8+
val ServerStartedLogMessagePrefix: String = "server started"
9+
10+
val StartClientTestFlagFilePath: Path = Paths.get("/tmp/.start-test-flag")
11+
12+
val ClientTestSuccessLogMessage: String = "TEST SUCCESS"
13+
val ClientTestFailedLogMessage: String = "TEST FAILED"
14+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package com.evolution.jgrpc.tools.k8sdns.it
2+
3+
import io.grpc.netty.NettyChannelBuilder
4+
import k8sdns.it.test_svc.TestSvcGrpc.TestSvcBlockingStub
5+
import k8sdns.it.test_svc.{GetIdRequest, TestSvcGrpc}
6+
7+
import java.net.InetAddress
8+
import java.nio.file.{Files, Paths, StandardOpenOption}
9+
import scala.jdk.CollectionConverters._
10+
11+
private[it] final class TestClient {
12+
13+
private val svcHostname: String = "svc.example.org"
14+
15+
def run(): Unit = {
16+
try {
17+
waitForStartTestFlagFile()
18+
runTest()
19+
println(K8sDnsItAppShared.ClientTestSuccessLogMessage)
20+
} catch {
21+
case t: Throwable =>
22+
t.printStackTrace()
23+
println(K8sDnsItAppShared.ClientTestFailedLogMessage)
24+
sys.exit(1)
25+
}
26+
}
27+
28+
private def runTest(): Unit = {
29+
val srv1Ip = InetAddress.getByName("test-server1").getHostAddress
30+
val srv2Ip = InetAddress.getByName("test-server2").getHostAddress
31+
println(srv1Ip)
32+
println(srv2Ip)
33+
34+
writeCoreDnsCoreFile()
35+
addServiceIpToCoreDnsHostsFile(srv1Ip)
36+
37+
sys.process.Process("coredns -conf /etc/coredns/CoreFile").run()
38+
Files.write(
39+
Paths.get("/etc/resolv.conf"),
40+
Vector(
41+
"nameserver 127.0.0.1",
42+
).asJava,
43+
StandardOpenOption.TRUNCATE_EXISTING,
44+
)
45+
Thread.sleep(4000L)
46+
// while (true) {
47+
// sys.process.Process("nslookup svc.example.org").!
48+
// Thread.sleep(4000L)
49+
// }
50+
51+
val channel = NettyChannelBuilder
52+
.forTarget(s"k8s-dns://$svcHostname:${ K8sDnsItAppShared.ServerPort }")
53+
.usePlaintext()
54+
.defaultLoadBalancingPolicy("round_robin")
55+
.build()
56+
57+
val client = TestSvcGrpc.blockingStub(channel)
58+
59+
require(callServerNTimes(client, 2).toSet == Set(1))
60+
61+
addServiceIpToCoreDnsHostsFile(srv2Ip)
62+
63+
Thread.sleep(15000L)
64+
65+
require(callServerNTimes(client, 2).toSet == Set(1, 2))
66+
}
67+
68+
private def waitForStartTestFlagFile(): Unit = {
69+
println("waiting for flag")
70+
while (!Files.exists(K8sDnsItAppShared.StartClientTestFlagFilePath)) {
71+
Thread.sleep(1000L)
72+
}
73+
println("waiting for flag - finished")
74+
}
75+
76+
private def writeCoreDnsCoreFile(): Unit = {
77+
Files.writeString(
78+
Paths.get("/etc/coredns/CoreFile"),
79+
s"""$svcHostname {
80+
| hosts /etc/coredns/hosts {
81+
| ttl 5
82+
| reload 5s
83+
| }
84+
| reload 5s 1s
85+
| errors # show errors
86+
| log # enable query logs
87+
|}
88+
|""".stripMargin,
89+
)
90+
}
91+
92+
private def addServiceIpToCoreDnsHostsFile(ip: String): Unit = {
93+
Files.write(
94+
Paths.get("/etc/coredns/hosts"),
95+
Vector(
96+
s"$ip $svcHostname",
97+
).asJava,
98+
StandardOpenOption.CREATE,
99+
StandardOpenOption.APPEND,
100+
)
101+
}
102+
103+
// private val srv1Ip = InetAddress.getByName("test-server1").getHostAddress
104+
// private val srv2Ip = InetAddress.getByName("test-server2").getHostAddress
105+
// println(srv1Ip)
106+
// println(srv2Ip)
107+
//
108+
// Files.writeString(
109+
// Paths.get("/etc/coredns/CoreFile"),
110+
// """example.org {
111+
// | hosts /etc/coredns/hosts {
112+
// | ttl 5
113+
// | reload 5s
114+
// | }
115+
// | reload 5s 1s
116+
// | errors # show errors
117+
// | log # enable query logs
118+
// |}
119+
// |""".stripMargin,
120+
// )
121+
//
122+
// Files.write(
123+
// Paths.get("/etc/coredns/hosts"),
124+
// Vector(
125+
// s"$srv1Ip svc.example.org",
126+
// ).asJava,
127+
// )
128+
//
129+
// sys.process.Process("coredns -conf /etc/coredns/CoreFile").run()
130+
// Files.write(
131+
// Paths.get("/etc/resolv.conf"),
132+
// Vector(
133+
// "nameserver 127.0.0.1",
134+
// ).asJava,
135+
// StandardOpenOption.TRUNCATE_EXISTING,
136+
// )
137+
// Thread.sleep(4000L)
138+
//// while (true) {
139+
//// sys.process.Process("nslookup svc.example.org").!
140+
//// Thread.sleep(4000L)
141+
//// }
142+
//
143+
//// Files.write(
144+
//// Paths.get("/etc/hosts"),
145+
//// Vector(
146+
//// s"$srv1Ip test-server",
147+
//// ).asJava,
148+
//// StandardOpenOption.APPEND,
149+
//// )
150+
//
151+
// private val channel = NettyChannelBuilder
152+
// .forTarget("k8s-dns://svc.example.org:9000")
153+
// .usePlaintext()
154+
// .defaultLoadBalancingPolicy("round_robin")
155+
// .build()
156+
//
157+
// private val client = TestSvcGrpc.blockingStub(channel)
158+
//
159+
// require(callServerNTimes(2).toSet == Set(1))
160+
//
161+
// Files.write(
162+
// Paths.get("/etc/coredns/hosts"),
163+
// Vector(
164+
// s"$srv2Ip svc.example.org",
165+
// ).asJava,
166+
// StandardOpenOption.APPEND,
167+
// )
168+
//
169+
// Thread.sleep(15000L)
170+
//
171+
// require(callServerNTimes(2).toSet == Set(1, 2))
172+
//
173+
// println("TEST SUCCESS")
174+
175+
private def callServerNTimes(client: TestSvcBlockingStub, times: Int): Vector[Int] = {
176+
0.until(times).map { _ =>
177+
client.getId(GetIdRequest()).id
178+
}.toVector
179+
}
180+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.evolution.jgrpc.tools.k8sdns.it
2+
3+
import io.grpc.netty.NettyServerBuilder
4+
import k8sdns.it.test_svc.TestSvcGrpc
5+
6+
import java.util.concurrent.TimeUnit
7+
import scala.concurrent.ExecutionContext
8+
9+
private[it] final class TestServer(instanceId: Int) {
10+
private val server = NettyServerBuilder
11+
.forPort(K8sDnsItAppShared.ServerPort)
12+
.maxConnectionIdle(10, TimeUnit.MINUTES)
13+
.addService(TestSvcGrpc.bindService(new TestSvcImpl(instanceId = instanceId), ExecutionContext.global))
14+
.build
15+
16+
def run(): Unit = {
17+
server.start()
18+
println(s"${ K8sDnsItAppShared.ServerStartedLogMessagePrefix }: $instanceId")
19+
server.awaitTermination()
20+
}
21+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.evolution.jgrpc.tools.k8sdns.it
2+
3+
import k8sdns.it.test_svc._
4+
5+
import scala.concurrent.Future
6+
7+
private[it] final class TestSvcImpl(instanceId: Int) extends TestSvcGrpc.TestSvc {
8+
override def getId(request: GetIdRequest): Future[GetIdReply] = {
9+
Future.successful(GetIdReply(id = instanceId))
10+
}
11+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
services:
2+
test-server1:
3+
&test-app
4+
build: ../../../target/docker/stage
5+
environment:
6+
TEST_SVC_RUN_MODE: server
7+
TEST_SVC_INSTANCE_ID: 1
8+
deploy:
9+
resources:
10+
limits:
11+
cpus: 1
12+
memory: 200M
13+
14+
test-server2:
15+
<<: *test-app
16+
environment:
17+
TEST_SVC_RUN_MODE: server
18+
TEST_SVC_INSTANCE_ID: 2
19+
20+
test-client:
21+
<<: *test-app
22+
environment:
23+
TEST_SVC_RUN_MODE: client

0 commit comments

Comments
 (0)