Skip to content

Commit 421ab9e

Browse files
authored
Fix working with short DNS names for k8s services (#7)
I.e. k8s-dns:///my-svc.my-namespace should work now, given the correct resolv.conf search domains settings.
1 parent bb72205 commit 421ab9e

File tree

6 files changed

+265
-43
lines changed

6 files changed

+265
-43
lines changed

k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestAppShared.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ object TestAppShared {
139139
val values: Vector[TestClientTestCase] = Vector(
140140
DiscoverNewPod,
141141
DnsFailureRecover,
142+
ResolveShortDomainName,
142143
)
143144

144145
/**
@@ -165,5 +166,14 @@ object TestAppShared {
165166
* - check that after the configured reload TTL, the client sees both servers
166167
*/
167168
case object DnsFailureRecover extends TestClientTestCase
169+
170+
/**
171+
* [[TestClient]] test case verifying that `K8sDnsNameResolver` resolves short k8s
172+
* service domain names using resolv.conf search domains.
173+
*
174+
* Why this needs a separate test case:
175+
* [[com.evolution.jgrpc.tools.k8sdns.NameLookupState]]
176+
*/
177+
case object ResolveShortDomainName extends TestClientTestCase
168178
}
169179
}

k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ private[it] final class TestClient {
6363
testCaseDiscoverNewPod(fixture)
6464
case TestClientTestCase.DnsFailureRecover =>
6565
testCaseDnsFailureRecover(fixture)
66+
case TestClientTestCase.ResolveShortDomainName =>
67+
testCaseResolveShortDomainName(fixture)
6668
}
6769
}
6870

@@ -86,22 +88,22 @@ private[it] final class TestClient {
8688
private def testCaseDiscoverNewPod(fixture: Fixture): Unit = {
8789
fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip))
8890

89-
withRoundRobinLbClient { client =>
90-
callHost2TimesAssertServerIds(client, expectedServerIds = Set(1))
91+
withRoundRobinLbClient() { client =>
92+
callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1))
9193

9294
fixture.coreDns.setServiceIps(Set(fixture.srv1Ip, fixture.srv2Ip))
9395

9496
sleepUntilClientGetsDnsUpdate()
9597

96-
callHost2TimesAssertServerIds(client, expectedServerIds = Set(1, 2))
98+
callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1, 2))
9799
}
98100
}
99101

100102
private def testCaseDnsFailureRecover(fixture: Fixture): Unit = {
101103
fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip))
102104

103-
withRoundRobinLbClient { client =>
104-
callHost2TimesAssertServerIds(client, expectedServerIds = Set(1))
105+
withRoundRobinLbClient() { client =>
106+
callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1))
105107

106108
fixture.coreDns.ensureStopped()
107109

@@ -111,7 +113,15 @@ private[it] final class TestClient {
111113

112114
sleepUntilClientGetsDnsUpdate()
113115

114-
callHost2TimesAssertServerIds(client, expectedServerIds = Set(1, 2))
116+
callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1, 2))
117+
}
118+
}
119+
120+
private def testCaseResolveShortDomainName(fixture: Fixture): Unit = {
121+
fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip, fixture.srv2Ip))
122+
123+
withRoundRobinLbClient(targetHostname = svcHostnameShort) { client =>
124+
callHostManyTimesAssertServerIds(client, expectedServerIds = Set(1, 2))
115125
}
116126
}
117127

@@ -125,21 +135,44 @@ private[it] final class TestClient {
125135
Thread.sleep(sleepIntervalSeconds.toLong * 1000)
126136
}
127137

128-
private def callHost2TimesAssertServerIds(
138+
private def callHostManyTimesAssertServerIds(
129139
client: TestSvcBlockingStub,
130140
expectedServerIds: Set[Int],
131141
): Unit = {
132-
val actualServerIds = 0.until(2).map { _ =>
142+
require(expectedServerIds.subsetOf(allServerIds))
143+
144+
var observedServerIdsVec: Vector[Int] = Vector.fill(allServerIds.size) {
133145
client.getId(GetIdRequest()).id
134-
}.toSet
135-
if (actualServerIds != expectedServerIds) {
136-
sys.error(s"GRPC client observed server IDs $actualServerIds, expected $expectedServerIds")
146+
}
147+
148+
// When the client is just establishing connections, sometimes calling it multiple times
149+
// doesn't give the round-robin call picture (1, 2, 1, 2,...),
150+
// but it appears as if the client
151+
// routes all the calls to the first opened connection,
152+
// while waiting for the rest to be fully ready.
153+
// So if we haven't observed all the servers yet, let's wait a bit and call again.
154+
// This helps to recover such cases.
155+
if (observedServerIdsVec.toSet.size < allServerIds.size) {
156+
Thread.sleep(1000L)
157+
observedServerIdsVec = observedServerIdsVec ++ Vector.fill(allServerIds.size) {
158+
client.getId(GetIdRequest()).id
159+
}
160+
}
161+
162+
val observedServerIds = observedServerIdsVec.toSet
163+
if (observedServerIds != expectedServerIds) {
164+
sys.error(s"GRPC client expected server IDs $expectedServerIds, " +
165+
s"observed $observedServerIds (received in order: $observedServerIdsVec)")
137166
}
138167
}
139168

140-
private def withRoundRobinLbClient[T](body: TestSvcBlockingStub => T): T = {
169+
private def withRoundRobinLbClient[T](
170+
targetHostname: String = svcHostname,
171+
)(
172+
body: TestSvcBlockingStub => T,
173+
): T = {
141174
val channel = NettyChannelBuilder
142-
.forTarget(s"k8s-dns://$svcHostname:${ TestAppShared.ServerPort }")
175+
.forTarget(s"k8s-dns://$targetHostname:${ TestAppShared.ServerPort }")
143176
.usePlaintext()
144177
.defaultLoadBalancingPolicy("round_robin")
145178
.build()
@@ -156,7 +189,10 @@ private[it] final class TestClient {
156189
}
157190

158191
private object TestClient {
159-
private val svcHostname: String = "svc.example.org"
192+
private val allServerIds = Set(1, 2)
193+
private val clusterHostnameSuffix = "svc.cluster.local"
194+
private val svcHostnameShort = "acme-grpc.acme"
195+
private val svcHostname = s"$svcHostnameShort.$clusterHostnameSuffix"
160196
private val resolveConfPath = "/etc/resolv.conf"
161197

162198
private val coreDnsCoreFilePath = "/etc/coredns/CoreFile"
@@ -178,6 +214,8 @@ private object TestClient {
178214
Paths.get(resolveConfPath),
179215
Vector(
180216
"nameserver 127.0.0.1",
217+
s"search $clusterHostnameSuffix",
218+
"options ndots:5",
181219
).asJava,
182220
StandardOpenOption.TRUNCATE_EXISTING,
183221
)
@@ -224,7 +262,7 @@ private object TestClient {
224262
private def writeCoreFile(): Unit = {
225263
Files.writeString(
226264
Paths.get(coreDnsCoreFilePath),
227-
s"""$svcHostname {
265+
s""". {
228266
| hosts $coreDnsHostsFilePath {
229267
| ttl $coreDnsHostsReloadIntervalSeconds
230268
| reload ${ coreDnsHostsReloadIntervalSeconds }s

k8s-dns-name-resolver-it/src/test/scala/com/evolution/jgrpc/tools/k8sdns/it/K8sDnsNameResolverIt.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ class K8sDnsNameResolverIt extends AnyFreeSpec with BeforeAndAfterAll {
7272
"should recover after DNS query failure" in {
7373
runTestCase(TestClientTestCase.DnsFailureRecover)
7474
}
75+
76+
"should resolve short domain names with search domains" in {
77+
runTestCase(TestClientTestCase.ResolveShortDomainName)
78+
}
7579
}
7680

7781
private def runTestCase(testCase: TestClientTestCase): Unit = {

k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/K8sDnsNameResolver.java

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static java.lang.Math.max;
44
import static java.lang.String.format;
55

6-
import com.google.common.net.InetAddresses;
76
import io.grpc.*;
87
import io.grpc.SynchronizationContext.ScheduledHandle;
98
import java.net.InetAddress;
@@ -12,18 +11,12 @@
1211
import java.time.Duration;
1312
import java.time.Instant;
1413
import java.util.List;
15-
import java.util.Optional;
1614
import java.util.concurrent.ScheduledExecutorService;
1715
import java.util.concurrent.TimeUnit;
1816
import java.util.function.BiConsumer;
1917
import org.jspecify.annotations.*;
2018
import org.slf4j.Logger;
2119
import org.slf4j.LoggerFactory;
22-
import org.xbill.DNS.Name;
23-
import org.xbill.DNS.Record;
24-
import org.xbill.DNS.Type;
25-
import org.xbill.DNS.lookup.LookupResult;
26-
import org.xbill.DNS.lookup.LookupSession;
2720

2821
/* package */ final class K8sDnsNameResolver extends NameResolver {
2922

@@ -33,7 +26,8 @@
3326
private final long refreshIntervalSeconds;
3427
private final SynchronizationContext syncCtx;
3528
private final ScheduledExecutorService scheduledExecutor;
36-
private final LookupSession dnsLookupSession;
29+
30+
private NameLookupState nameLookupState;
3731

3832
@Nullable private Listener listener = null;
3933

@@ -52,8 +46,7 @@ private record SuccessResult(List<InetAddress> addresses, Instant receiveTime) {
5246
this.refreshIntervalSeconds = refreshIntervalSeconds;
5347
this.syncCtx = syncCtx;
5448
this.scheduledExecutor = scheduledExecutor;
55-
this.dnsLookupSession =
56-
LookupSession.defaultBuilder().searchPath(targetUri.host()).clearCaches().build();
49+
this.nameLookupState = NameLookupState.initialize(targetUri.host());
5750
}
5851

5952
@Override
@@ -159,28 +152,21 @@ private EquivalentAddressGroup mkAddressGroup(InetAddress addr) {
159152
// callback is executed under syncCtx
160153
private void resolveAllAsync(
161154
BiConsumer<@Nullable List<InetAddress>, ? super @Nullable Throwable> cb) {
162-
final var dnsLookupAsyncResult = this.dnsLookupSession.lookupAsync(Name.empty, Type.A);
163-
dnsLookupAsyncResult
164-
.thenApply(
165-
(result) -> {
166-
logger.debug("DNS lookup result: {}", result);
167-
var records =
168-
Optional.ofNullable(result).map(LookupResult::getRecords).orElse(List.of());
169-
return records.stream()
170-
.map(Record::rdataToString)
171-
.distinct()
172-
.sorted() // make sure that result comparison does not depend on order
173-
.map(InetAddresses::forString)
174-
.toList();
175-
})
155+
nameLookupState
156+
.runNextLookup()
176157
.whenComplete(
177-
(addresses, err) ->
158+
(nameLookupState, err) ->
178159
this.syncCtx.execute(
179160
() -> {
180161
if (err != null) {
181162
logger.error("DNS lookup failed", err);
163+
cb.accept(null, err);
164+
} else {
165+
this.nameLookupState = nameLookupState;
166+
logger.debug(
167+
"DNS lookup successful {}", this.nameLookupState.getLastResult());
168+
cb.accept(this.nameLookupState.getLastResult(), null);
182169
}
183-
cb.accept(addresses, err);
184170
}));
185171
}
186172

0 commit comments

Comments
 (0)