Skip to content

Commit 16855c1

Browse files
committed
WIP
1 parent e5aa8b9 commit 16855c1

File tree

2 files changed

+49
-27
lines changed

2 files changed

+49
-27
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ lazy val k8sDnsNameResolverIt = project.in(file("k8s-dns-name-resolver-it"))
8787
// to manipulate its DNS settings
8888
Cmd("USER", "root"),
8989
// TODO: WIP do we really need bind-tools?
90-
ExecCmd("RUN", "apk", "add", "--no-cache", "bash", "lsof", "coredns", "bind-tools"),
90+
ExecCmd("RUN", "apk", "add", "--no-cache", "bash", "lsof", "coredns"),
9191
),
9292
dockerExposedPorts := Seq(9000), // Should match the test app GRPC server port.
9393
// The int test here needs the test app docker container staged before running the code.

k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.evolution.jgrpc.tools.k8sdns.it
22

3+
import com.evolution.jgrpc.tools.k8sdns.K8sDnsNameResolverProvider
34
import com.evolution.jgrpc.tools.k8sdns.it.ItAppShared._
45
import io.grpc.netty.NettyChannelBuilder
56
import k8sdns.it.test_svc.TestSvcGrpc.TestSvcBlockingStub
@@ -68,43 +69,57 @@ private[it] final class TestClient {
6869
}
6970

7071
private def testCaseDiscoverNewPod(fixture: Fixture): Unit = {
71-
fixture.coreDns.ensureStarted()
72+
fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip))
7273

7374
withRoundRobinLbClient { client =>
74-
require(callServerNTimes(client, 2).toSet == Set(1))
75+
callHost2TimesAssertServerIds(client, expectedServerIds = Set(1))
7576

76-
fixture.coreDns.setServiceIps(Vector(fixture.srv1Ip, fixture.srv2Ip))
77+
fixture.coreDns.setServiceIps(Set(fixture.srv1Ip, fixture.srv2Ip))
7778

78-
Thread.sleep(15000L) // TODO: wait time?
79+
sleepUntilClientGetsDnsUpdate()
7980

80-
require(callServerNTimes(client, 2).toSet == Set(1, 2))
81+
callHost2TimesAssertServerIds(client, expectedServerIds = Set(1, 2))
8182
}
8283
}
8384

8485
private def testCaseDnsFailureRecover(fixture: Fixture): Unit = {
85-
fixture.coreDns.ensureStarted()
86+
fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip))
8687

8788
withRoundRobinLbClient { client =>
88-
require(callServerNTimes(client, 2).toSet == Set(1))
89+
callHost2TimesAssertServerIds(client, expectedServerIds = Set(1))
8990

9091
fixture.coreDns.ensureStopped()
9192

92-
fixture.coreDns.setServiceIps(Vector(fixture.srv1Ip, fixture.srv2Ip))
93+
sleepUntilClientGetsDnsUpdate()
9394

94-
Thread.sleep(15000L) // TODO: wait time?
95+
fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip, fixture.srv2Ip))
9596

96-
fixture.coreDns.ensureStarted()
97+
sleepUntilClientGetsDnsUpdate()
9798

98-
Thread.sleep(15000L) // TODO: wait time?
99-
100-
require(callServerNTimes(client, 2).toSet == Set(1, 2))
99+
callHost2TimesAssertServerIds(client, expectedServerIds = Set(1, 2))
101100
}
102101
}
103102

104-
private def callServerNTimes(client: TestSvcBlockingStub, times: Int): Vector[Int] = {
105-
0.until(times).map { _ =>
103+
private def sleepUntilClientGetsDnsUpdate(): Unit = {
104+
val sleepIntervalSeconds =
105+
coreDnsHostsReloadIntervalSeconds +
106+
K8sDnsNameResolverProvider.DEFAULT_REFRESH_INTERVAL_SECONDS +
107+
2 // adding 2 seconds on top just in case
108+
109+
println(s"Sleeping until GRPC client gets DNS update: $sleepIntervalSeconds seconds")
110+
Thread.sleep(sleepIntervalSeconds.toLong * 1000)
111+
}
112+
113+
private def callHost2TimesAssertServerIds(
114+
client: TestSvcBlockingStub,
115+
expectedServerIds: Set[Int],
116+
): Unit = {
117+
val actualServerIds = 0.until(2).map { _ =>
106118
client.getId(GetIdRequest()).id
107-
}.toVector
119+
}.toSet
120+
if (actualServerIds != expectedServerIds) {
121+
sys.error(s"GRPC client observed server IDs $actualServerIds, expected $expectedServerIds")
122+
}
108123
}
109124

110125
private def withRoundRobinLbClient[T](body: TestSvcBlockingStub => T): T = {
@@ -132,11 +147,12 @@ private object TestClient {
132147
private val coreDnsHealthEndpointPort = 8080
133148
private val coreDnsReadyTimeout = 10.seconds
134149
private val coreDnsReadyCheckAttemptDelay = 2.seconds
150+
private val coreDnsHostsReloadIntervalSeconds = 2
135151

136152
private final class Fixture {
137153
val srv1Ip: String = InetAddress.getByName(TestAppSvcNames.Server1).getHostAddress
138154
val srv2Ip: String = InetAddress.getByName(TestAppSvcNames.Server2).getHostAddress
139-
val coreDns: CoreDns = new CoreDns(defaultServiceIps = Vector(srv1Ip))
155+
val coreDns: CoreDns = new CoreDns
140156
setCoreDnsAsPrimaryDns()
141157
}
142158

@@ -177,11 +193,11 @@ private object TestClient {
177193
}
178194
}
179195

180-
private final class CoreDns(defaultServiceIps: Vector[String]) {
196+
private final class CoreDns {
181197
private val healthCheckHttpClient = HttpClient.newHttpClient()
182198

183199
writeCoreFile()
184-
setServiceIps(defaultServiceIps)
200+
writeHostsFile(ips = Set.empty)
185201

186202
private var processOpt: Option[CoreDns.StartedProcess] = None
187203

@@ -190,10 +206,9 @@ private object TestClient {
190206
Paths.get(coreDnsCoreFilePath),
191207
s"""$svcHostname {
192208
| hosts $coreDnsHostsFilePath {
193-
| ttl 5
194-
| reload 5s
209+
| ttl $coreDnsHostsReloadIntervalSeconds
210+
| reload ${ coreDnsHostsReloadIntervalSeconds }s
195211
| }
196-
| reload 5s 1s
197212
| errors # show errors
198213
| log # enable query logs
199214
| health :$coreDnsHealthEndpointPort # enable healthcheck HTTP endpoint
@@ -240,7 +255,8 @@ private object TestClient {
240255
}
241256
}
242257

243-
def ensureStarted(): Unit = {
258+
def ensureStarted(serviceIps: Set[String]): Unit = {
259+
setServiceIps(serviceIps)
244260
processOpt match {
245261
case None =>
246262
startNewProcess()
@@ -256,8 +272,8 @@ private object TestClient {
256272
case Some(process) if process.sysProcess.isAlive() =>
257273
process.sysProcess.destroy() // TODO: does it wait for the process to stop?
258274
case _ =>
259-
processOpt = None
260275
}
276+
processOpt = None
261277
}
262278

263279
private def startNewProcess(): Unit = {
@@ -284,19 +300,25 @@ private object TestClient {
284300
}
285301
}
286302

287-
def setServiceIps(ips: Vector[String]): Unit = {
303+
private def writeHostsFile(ips: Set[String]): Unit = {
288304
// overwriting hosts file atomically so CoreDNS couldn't observe broken file content
289305

290306
val tmpHostsFile = Files.createTempFile("hosts", "txt")
291307
Files.write(
292308
tmpHostsFile,
293-
ips.map(ip => s"$ip $svcHostname").asJava,
309+
ips.toVector.sorted.map(ip => s"$ip $svcHostname").asJava,
294310
StandardOpenOption.CREATE,
295311
StandardOpenOption.TRUNCATE_EXISTING,
296312
)
297313

298314
Files.move(tmpHostsFile, Paths.get(coreDnsHostsFilePath), StandardCopyOption.ATOMIC_MOVE)
299315
}
316+
317+
def setServiceIps(ips: Set[String]): Unit = {
318+
writeHostsFile(ips)
319+
// force CoreDNS to reread configs just to be sure
320+
processOpt.foreach(p => sys.process.Process(s"kill -HUP ${ p.pid }").!!)
321+
}
300322
}
301323

302324
private object CoreDns {

0 commit comments

Comments
 (0)