Skip to content

Commit fb6c779

Browse files
kcf - enable use of fieldref to inject action pod details to pod env (#4859)
* update env var to load dynamic value * make fieldref env vars configurable * cleanup * cleanup Co-authored-by: Duy Nguyen <[email protected]>
1 parent 42474d3 commit fb6c779

File tree

4 files changed

+70
-35
lines changed

4 files changed

+70
-35
lines changed

core/invoker/src/main/resources/application.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,14 @@ whisk {
9898
# max-millicpus = 4000
9999
#}
100100

101+
# Action pods can be injected with pod data using field refs to the pod spec (aka The Downward API):
102+
# https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/#use-pod-fields-as-values-for-environment-variables
103+
#field-ref-environment: {
104+
# "POD_NAMESPACE":"metadata.namespace",
105+
# "POD_NAME":"metadata.name",
106+
# "POD_UID": "metadata.uid"
107+
#}
108+
101109
#enable PodDisruptionBudget creation for pods? (will include same labels as pods, and specify minAvailable=1 to prevent termination of action pods during maintenance)
102110
pdb-enabled = false
103111
}

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesClient.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
9292
actionNamespace: Option[String],
9393
podTemplate: Option[ConfigMapValue],
9494
cpuScaling: Option[KubernetesCpuScalingConfig],
95-
pdbEnabled: Boolean)
95+
pdbEnabled: Boolean,
96+
fieldRefEnvironment: Option[Map[String, String]])
9697

9798
/**
9899
* Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
@@ -118,7 +119,7 @@ class KubernetesClient(
118119
new DefaultKubernetesClient(configBuilder.build())
119120
}
120121

121-
private val podBuilder = new WhiskPodBuilder(kubeRestClient, config.userPodNodeAffinity, config.podTemplate)
122+
private val podBuilder = new WhiskPodBuilder(kubeRestClient, config)
122123

123124
def run(name: String,
124125
image: String,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/WhiskPodBuilder.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,25 @@ import io.fabric8.kubernetes.api.model.policy.{PodDisruptionBudget, PodDisruptio
2525
import io.fabric8.kubernetes.api.model.{
2626
ContainerBuilder,
2727
EnvVarBuilder,
28+
EnvVarSourceBuilder,
2829
IntOrString,
2930
LabelSelectorBuilder,
3031
Pod,
3132
PodBuilder,
3233
Quantity
3334
}
3435
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
35-
import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
36+
import org.apache.openwhisk.common.TransactionId
3637
import org.apache.openwhisk.core.entity.ByteSize
3738

3839
import scala.collection.JavaConverters._
3940

40-
class WhiskPodBuilder(client: NamespacedKubernetesClient,
41-
userPodNodeAffinity: KubernetesInvokerNodeAffinity,
42-
podTemplate: Option[ConfigMapValue] = None) {
43-
private val template = podTemplate.map(_.value.getBytes(UTF_8))
41+
class WhiskPodBuilder(client: NamespacedKubernetesClient, config: KubernetesClientConfig) {
42+
private val template = config.podTemplate.map(_.value.getBytes(UTF_8))
4443
private val actionContainerName = "user-action"
4544
private val actionContainerPredicate: Predicate[ContainerBuilder] = (cb) => cb.getName == actionContainerName
4645

47-
def affinityEnabled: Boolean = userPodNodeAffinity.enabled
46+
def affinityEnabled: Boolean = config.userPodNodeAffinity.enabled
4847

4948
def buildPodSpec(
5049
name: String,
@@ -55,7 +54,15 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
5554
config: KubernetesClientConfig)(implicit transid: TransactionId): (Pod, Option[PodDisruptionBudget]) = {
5655
val envVars = environment.map {
5756
case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
58-
}.toSeq
57+
}.toSeq ++ config.fieldRefEnvironment
58+
.map(_.map({
59+
case (key, value) =>
60+
new EnvVarBuilder()
61+
.withName(key)
62+
.withValueFrom(new EnvVarSourceBuilder().withNewFieldRef().withFieldPath(value).endFieldRef().build())
63+
.build()
64+
}).toSeq)
65+
.getOrElse(Seq.empty)
5966

6067
val baseBuilder = template match {
6168
case Some(bytes) =>
@@ -73,17 +80,17 @@ class WhiskPodBuilder(client: NamespacedKubernetesClient,
7380

7481
val specBuilder = pb1.editOrNewSpec().withRestartPolicy("Always")
7582

76-
if (userPodNodeAffinity.enabled) {
83+
if (config.userPodNodeAffinity.enabled) {
7784
val affinity = specBuilder
7885
.editOrNewAffinity()
7986
.editOrNewNodeAffinity()
8087
.editOrNewRequiredDuringSchedulingIgnoredDuringExecution()
8188
affinity
8289
.addNewNodeSelectorTerm()
8390
.addNewMatchExpression()
84-
.withKey(userPodNodeAffinity.key)
91+
.withKey(config.userPodNodeAffinity.key)
8592
.withOperator("In")
86-
.withValues(userPodNodeAffinity.value)
93+
.withValues(config.userPodNodeAffinity.value)
8794
.endMatchExpression()
8895
.endNodeSelectorTerm()
8996
.endRequiredDuringSchedulingIgnoredDuringExecution()

tests/src/test/scala/org/apache/openwhisk/core/containerpool/kubernetes/test/WhiskPodBuilderTests.scala

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@
1818
package org.apache.openwhisk.core.containerpool.kubernetes.test
1919

2020
import io.fabric8.kubernetes.api.model.policy.PodDisruptionBudgetBuilder
21-
import io.fabric8.kubernetes.api.model.{IntOrString, LabelSelectorBuilder, Pod}
21+
import io.fabric8.kubernetes.api.model.{
22+
EnvVar,
23+
EnvVarSource,
24+
IntOrString,
25+
LabelSelectorBuilder,
26+
ObjectFieldSelector,
27+
Pod
28+
}
2229
import io.fabric8.kubernetes.client.utils.Serialization
2330
import org.apache.openwhisk.common.{ConfigMapValue, TransactionId}
2431
import org.apache.openwhisk.core.containerpool.kubernetes.{
@@ -46,20 +53,33 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
4653

4754
behavior of "WhiskPodBuilder"
4855

56+
def config(configMap: Option[ConfigMapValue] = None, affinity: Option[KubernetesInvokerNodeAffinity] = None) =
57+
KubernetesClientConfig(
58+
KubernetesClientTimeoutConfig(1.seconds, 2.seconds),
59+
affinity.getOrElse(KubernetesInvokerNodeAffinity(false, "k", "v")),
60+
false,
61+
None,
62+
configMap,
63+
Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
64+
false,
65+
Some(Map("POD_UID" -> "metadata.uid")))
66+
4967
it should "build a new pod" in {
50-
val builder = new WhiskPodBuilder(kubeClient, affinity)
51-
assertPodSettings(builder)
68+
val c = config()
69+
val builder = new WhiskPodBuilder(kubeClient, c)
70+
assertPodSettings(builder, c)
5271
}
5372
it should "build set cpu scaled based on memory, if enabled in configuration" in {
54-
val builder = new WhiskPodBuilder(kubeClient, affinity)
5573
val config = KubernetesClientConfig(
5674
KubernetesClientTimeoutConfig(1.second, 1.second),
5775
KubernetesInvokerNodeAffinity(false, "k", "v"),
5876
true,
5977
None,
6078
None,
6179
Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
62-
false)
80+
false,
81+
None)
82+
val builder = new WhiskPodBuilder(kubeClient, config)
6383

6484
val (pod, _) = builder.buildPodSpec(name, testImage, 2.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config)
6585
withClue(Serialization.asYaml(pod)) {
@@ -89,7 +109,8 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
89109
None,
90110
None,
91111
None,
92-
false)
112+
false,
113+
None)
93114
val (pod4, _) = builder.buildPodSpec(name, testImage, 7.MB, Map("foo" -> "bar"), Map("fooL" -> "barV"), config2)
94115
withClue(Serialization.asYaml(pod4)) {
95116
val c = getActionContainer(pod4)
@@ -121,8 +142,9 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
121142
| image : "busybox"
122143
|""".stripMargin
123144

124-
val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled = false), Some(ConfigMapValue(template)))
125-
val pod = assertPodSettings(builder)
145+
val c = config(Some(ConfigMapValue(template)))
146+
val builder = new WhiskPodBuilder(kubeClient, c)
147+
val pod = assertPodSettings(builder, c)
126148

127149
val ac = getActionContainer(pod)
128150
ac.getSecurityContext.getCapabilities.getDrop.asScala should contain("TEST_CAP")
@@ -136,8 +158,9 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
136158
}
137159

138160
it should "build a pod disruption budget for the pod, if enabled" in {
139-
val builder = new WhiskPodBuilder(kubeClient, affinity)
140-
assertPodSettings(builder, true)
161+
val c = config()
162+
val builder = new WhiskPodBuilder(kubeClient, c)
163+
assertPodSettings(builder, c)
141164
}
142165

143166
it should "extend existing pod template with affinity" in {
@@ -155,28 +178,24 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
155178
| values:
156179
| - "test"""".stripMargin
157180

158-
val builder = new WhiskPodBuilder(kubeClient, affinity.copy(enabled = true), Some(ConfigMapValue(template)))
159-
val pod = assertPodSettings(builder)
181+
val c = config(Some(ConfigMapValue(template)), Some(affinity.copy(enabled = true)))
182+
val builder =
183+
new WhiskPodBuilder(kubeClient, c)
184+
val pod = assertPodSettings(builder, c)
160185

161186
val terms =
162187
pod.getSpec.getAffinity.getNodeAffinity.getRequiredDuringSchedulingIgnoredDuringExecution.getNodeSelectorTerms.asScala
163188
terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == "nodelabel")) shouldBe true
164189
}
165190

166-
private def assertPodSettings(builder: WhiskPodBuilder, pdbEnabled: Boolean = false): Pod = {
167-
val config = KubernetesClientConfig(
168-
KubernetesClientTimeoutConfig(1.second, 1.second),
169-
KubernetesInvokerNodeAffinity(false, "k", "v"),
170-
true,
171-
None,
172-
None,
173-
Some(KubernetesCpuScalingConfig(300, 3.MB, 1000)),
174-
pdbEnabled)
191+
private def assertPodSettings(builder: WhiskPodBuilder, config: KubernetesClientConfig): Pod = {
175192
val labels = Map("fooL" -> "barV")
176193
val (pod, pdb) = builder.buildPodSpec(name, testImage, memLimit, Map("foo" -> "bar"), labels, config)
177194
withClue(Serialization.asYaml(pod)) {
178195
val c = getActionContainer(pod)
179-
c.getEnv.asScala.exists(_.getName == "foo") shouldBe true
196+
c.getEnv.asScala.shouldBe(Seq(
197+
new EnvVar("foo", "bar", null),
198+
new EnvVar("POD_UID", null, new EnvVarSource(null, new ObjectFieldSelector(null, "metadata.uid"), null, null))))
180199

181200
c.getResources.getLimits.asScala.get("memory").map(_.getAmount) shouldBe Some("10Mi")
182201
c.getResources.getLimits.asScala.get("cpu").map(_.getAmount) shouldBe Some("900m")
@@ -195,7 +214,7 @@ class WhiskPodBuilderTests extends FlatSpec with Matchers with KubeClientSupport
195214
terms.exists(_.getMatchExpressions.asScala.exists(_.getKey == affinity.key)) shouldBe true
196215
}
197216
}
198-
if (pdbEnabled) {
217+
if (config.pdbEnabled) {
199218
println("matching pdb...")
200219
pdb shouldBe Some(
201220
new PodDisruptionBudgetBuilder().withNewMetadata

0 commit comments

Comments
 (0)