Skip to content

Commit 0671395

Browse files
jlpedrosasrowen
authored andcommitted
[SPARK-27989][CORE] Added retries on the connection to the driver for k8s
Disabled negative dns caching for docker images Improved logging on DNS resolution, convenient for slow k8s clusters ## What changes were proposed in this pull request? Added retries when building the connection to the driver in K8s. In some scenarios DNS reslution can take more than the timeout. Also openjdk-8 by default has negative dns caching enabled, which means even retries may not help depending on the times. ## How was this patch tested? This patch was tested agains an specific k8s cluster with slow response time in DNS to ensure it woks. Closes apache#24702 from jlpedrosa/feature/kuberetries. Authored-by: Jose Luis Pedrosa <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 85e95b7 commit 0671395

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,11 @@ public TransportClient createClient(String remoteHost, int remotePort)
172172
final long preResolveHost = System.nanoTime();
173173
final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
174174
final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
175+
final String resolvMsg = resolvedAddress.isUnresolved() ? "failed" : "succeed";
175176
if (hostResolveTimeMs > 2000) {
176-
logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
177+
logger.warn("DNS resolution {} for {} took {} ms", resolvMsg, resolvedAddress, hostResolveTimeMs);
177178
} else {
178-
logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
179+
logger.trace("DNS resolution {} for {} took {} ms", resolvMsg, resolvedAddress, hostResolveTimeMs);
179180
}
180181

181182
synchronized (clientPool.locks[clientIndex]) {

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,19 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
262262
executorConf,
263263
new SecurityManager(executorConf),
264264
clientMode = true)
265-
val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
265+
266+
var driver: RpcEndpointRef = null
267+
val nTries = 3
268+
for (i <- 0 until nTries if driver == null) {
269+
try {
270+
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
271+
} catch {
272+
case e: Throwable => if (i == nTries - 1) {
273+
throw e
274+
}
275+
}
276+
}
277+
266278
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
267279
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
268280
fetcher.shutdown()

0 commit comments

Comments
 (0)