Skip to content

Commit 3399d92

Browse files
committed
changes to executor pod allocator
1 parent 6e48ef2 commit 3399d92

File tree

1 file changed

+21
-22
lines changed

1 file changed

+21
-22
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ private[spark] class ExecutorPodsAllocator(
8383
newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
8484
val currentTime = clock.getTimeMillis()
8585
if (currentTime - timeCreated > podCreationTimeout) {
86-
safeLogWarning("Executor with id {execId} was not detected in the Kubernetes" +
87-
" cluster after {podCreationTimeout} milliseconds despite the fact that a" +
86+
safeLogWarning("Executor was not detected in the Kubernetes" +
87+
" cluster after timeout despite the fact that a" +
8888
" previous allocation attempt tried to create it. The executor may have been" +
8989
" deleted but the application missed the deletion event.",
90-
SafeArg.of("execId", execId),
91-
SafeArg.of("podCreationTimeout", podCreationTimeout))
90+
SafeArg.of("executorId", execId),
91+
SafeArg.of("podCreationTimeoutMs", podCreationTimeout))
9292
Utils.tryLogNonFatalError {
9393
kubernetesClient
9494
.pods()
@@ -97,10 +97,10 @@ private[spark] class ExecutorPodsAllocator(
9797
}
9898
newlyCreatedExecutors -= execId
9999
} else {
100-
safeLogDebug("Executor with id {execId} was not found in the Kubernetes cluster since it" +
101-
" was created {timeSinceCreation} milliseconds ago.",
102-
SafeArg.of("execId", execId),
103-
SafeArg.of("timeSinceCreation", currentTime - timeCreated))
100+
safeLogDebug("Executor was not found in the Kubernetes cluster since it" +
101+
" was created some time ago.",
102+
SafeArg.of("executorId", execId),
103+
SafeArg.of("timeSinceCreationMs", currentTime - timeCreated))
104104
}
105105
}
106106

@@ -117,18 +117,18 @@ private[spark] class ExecutorPodsAllocator(
117117
case _ => false
118118
}
119119
val currentTotalExpectedExecutors = totalExpectedExecutors.get
120-
safeLogDebug("Currently have {currentRunningExecutors} running executors and" +
121-
" {currentPendingExecutors} pending executors. {newlyCreatedExecutors} executors" +
120+
safeLogDebug("Currently have running executors and" +
121+
" pending executors. Newly created executors executors" +
122122
" have been requested but are pending appearance in the cluster.",
123-
SafeArg.of("currentRunningExecutors", currentRunningExecutors),
124-
SafeArg.of("currentPendingExecutors", currentPendingExecutors),
123+
SafeArg.of("numCurrentRunningExecutors", currentRunningExecutors),
124+
SafeArg.of("numCurrentPendingExecutors", currentPendingExecutors),
125125
SafeArg.of("newlyCreatedExecutors", newlyCreatedExecutors))
126126
if (newlyCreatedExecutors.isEmpty
127127
&& currentPendingExecutors == 0
128128
&& currentRunningExecutors < currentTotalExpectedExecutors) {
129129
val numExecutorsToAllocate = math.min(
130130
currentTotalExpectedExecutors - currentRunningExecutors, podAllocationSize)
131-
safeLogInfo("Going to request {numExecutorsToAllocate} executors from Kubernetes.",
131+
safeLogInfo("Going to request executors from Kubernetes.",
132132
SafeArg.of("numExecutorsToAllocate", numExecutorsToAllocate))
133133
for ( _ <- 0 until numExecutorsToAllocate) {
134134
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
@@ -145,23 +145,22 @@ private[spark] class ExecutorPodsAllocator(
145145
.build()
146146
kubernetesClient.pods().create(podWithAttachedContainer)
147147
newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis()
148-
safeLogDebug("Requested executor with id {newExecutorId} from Kubernetes.",
148+
safeLogDebug("Requested executor from Kubernetes.",
149149
SafeArg.of("newExecutorId", newExecutorId))
150150
}
151151
} else if (currentRunningExecutors >= currentTotalExpectedExecutors) {
152152
// TODO handle edge cases if we end up with more running executors than expected.
153153
safeLogDebug("Current number of running executors is equal to the number of requested" +
154154
" executors. Not scaling up further.")
155155
} else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) {
156-
safeLogDebug("Still waiting for {currentWaitingExecutors}" +
157-
" executors to begin running before requesting for more executors. # of executors in" +
158-
" pending status in the cluster: {currentPendingExecutors}. # of executors that we have" +
159-
" created but we have not observed as being present in the cluster yet:" +
160-
" {newlyCreatedExecutors}.",
161-
SafeArg.of("currentWaitingExecutors",
156+
safeLogDebug("Still waiting for" +
157+
" executors to begin running before requesting for more executors, including executors" +
158+
" in pending status in the cluster, and executors that we have" +
159+
" created but we have not observed as being present in the cluster yet.",
160+
SafeArg.of("numCurrentWaitingExecutors",
162161
newlyCreatedExecutors.size + currentPendingExecutors),
163-
SafeArg.of("currentPendingExecutors", currentPendingExecutors),
164-
SafeArg.of("newlyCreatedExecutors", newlyCreatedExecutors.size))
162+
SafeArg.of("numCurrentPendingExecutors", currentPendingExecutors),
163+
SafeArg.of("numNewlyCreatedExecutors", newlyCreatedExecutors.size))
165164
}
166165
}
167166
}

0 commit comments

Comments
 (0)