Skip to content

Commit 7b640d5

Browse files
committed
add support for various k8s platform fields
This change provides sdk support for the following kubernetes platform fields: * configmap name for configmap as env & volume * secret name for configmap as env & volume * image pull secret name It also adds support for tolerations, node selectors as input json for the entire toleration & node selector spec. We opt for this solution because if a user were to try to use as input parameters individual key value pairs, the input parameter count would increase drastically. The implementation borrows the InputParamSpec from the pipeline spec package. This is copied over instead of imported due to cyclic dependency issues with proto imports and the current project structure. Additionally, pvc_mount is updated to accomodate this new spec, which makes it more in line & consistent with how input parameters are handled elsewhere in the spec. Finally, all old name parameter fields like configmap names, secret names, etc. are deprecated, as the runtime value proto message covers this via it's constant field. Signed-off-by: Humair Khan <[email protected]>
1 parent 6a13f4b commit 7b640d5

File tree

9 files changed

+1511
-528
lines changed

9 files changed

+1511
-528
lines changed

kubernetes_platform/go/kubernetesplatform/kubernetes_executor_config.pb.go

Lines changed: 1224 additions & 459 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

kubernetes_platform/proto/kubernetes_executor_config.proto

Lines changed: 145 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,20 @@ message EnabledSharedMemory {
4949
}
5050

5151
message SecretAsVolume {
52-
// Name of the Secret.
53-
string secret_name = 1;
52+
// Deprecated, use secret_name_parameter instead.
53+
string secret_name = 1 [deprecated = true];
5454
// Container path to mount the Secret data.
5555
string mount_path = 2;
5656
// An optional boolean value indicating whether the Secret must be defined.
5757
optional bool optional = 3;
58+
59+
// Name of the Secret.
60+
InputParameterSpec secret_name_parameter = 4;
5861
}
5962

6063
message SecretAsEnv {
61-
// Name of the Secret.
62-
string secret_name = 1;
64+
// Deprecated, use secret_name_parameter instead.
65+
string secret_name = 1 [deprecated = true];
6366

6467
message SecretKeyToEnvMap {
6568
// Corresponds to a key of the Secret.data field.
@@ -69,31 +72,26 @@ message SecretAsEnv {
6972
}
7073

7174
repeated SecretKeyToEnvMap key_to_env = 2;
72-
}
73-
74-
// Represents an upstream task's output parameter.
75-
message TaskOutputParameterSpec {
76-
// The name of the upstream task which produces the output parameter that
77-
// matches with the `output_parameter_key`.
78-
string producer_task = 1;
7975

80-
// The key of [TaskOutputsSpec.parameters][] map of the producer task.
81-
string output_parameter_key = 2;
76+
// Name of the Secret.
77+
InputParameterSpec secret_name_parameter = 4;
8278
}
8379

8480
message PvcMount {
85-
// Identifier for the PVC.
86-
// Used like TaskInputsSpec.InputParameterSpec.kind.
81+
// Deprecated, use pvc_name_parameter instead.
8782
oneof pvc_reference {
8883
// Output parameter from an upstream task.
89-
TaskOutputParameterSpec task_output_parameter = 1;
84+
TaskOutputParameterSpec task_output_parameter = 1 [deprecated = true];
9085
// A constant value.
91-
string constant = 2;
86+
string constant = 2 [deprecated = true];
9287
// Pass the input parameter from parent component input parameter.
93-
string component_input_parameter = 3;
88+
string component_input_parameter = 3 [deprecated = true];
9489
}
9590
// Container path to which the PVC should be mounted.
9691
string mount_path = 4;
92+
93+
// Name of the PVC.
94+
InputParameterSpec pvc_name_parameter = 5;
9795
}
9896

9997
message CreatePvc {
@@ -136,6 +134,11 @@ message NodeSelector {
136134
// map of label key to label value
137135
// corresponds to Pod.spec.nodeSelector field https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/pod-v1/#scheduling
138136
map<string, string> labels = 1;
137+
138+
// Provide a json struct of node selector
139+
// Takes precedence over labels.
140+
// Example: {"disk-type": "ssd", "region": "us-west"}
141+
InputParameterSpec node_selector_json = 2;
139142
}
140143

141144
message PodMetadata {
@@ -146,17 +149,20 @@ message PodMetadata {
146149
}
147150

148151
message ConfigMapAsVolume {
149-
// Name of the ConfigMap.
150-
string config_map_name = 1;
152+
// Deprecated, use config_name_parameter instead.
153+
string config_map_name = 1 [deprecated = true];
151154
// Container path to mount the ConfigMap data.
152155
string mount_path = 2;
153156
// An optional boolean value indicating whether the ConfigMap must be defined.
154157
optional bool optional = 3;
158+
159+
// Name of the ConfigMap.
160+
InputParameterSpec config_name_parameter = 4;
155161
}
156162

157163
message ConfigMapAsEnv {
158-
// Name of the ConfigMap.
159-
string config_map_name = 1;
164+
// Deprecated, use config_name_parameter instead.
165+
string config_map_name = 1 [deprecated = true];
160166

161167
message ConfigMapKeyToEnvMap {
162168
// Corresponds to a key of the ConfigMap.
@@ -166,6 +172,9 @@ message ConfigMapAsEnv {
166172
}
167173

168174
repeated ConfigMapKeyToEnvMap key_to_env = 2;
175+
176+
// Name of the ConfigMap.
177+
InputParameterSpec config_name_parameter = 3;
169178
}
170179

171180
message GenericEphemeralVolume {
@@ -190,7 +199,9 @@ message GenericEphemeralVolume {
190199

191200
message ImagePullSecret {
192201
// Name of the image pull secret.
193-
string secret_name = 1;
202+
string secret_name = 1 [deprecated = true];
203+
204+
InputParameterSpec secret_name_parameter = 2;
194205
}
195206

196207
message FieldPathAsEnv {
@@ -207,6 +218,14 @@ message Toleration {
207218
string value = 3;
208219
string effect = 4;
209220
optional int64 toleration_seconds = 5;
221+
222+
// Provide a json struct of the toleration
223+
// Takes precedence over key, operator, value, effect.
224+
// Example: {"key": "key1", "operator": "Equal", "value": "value1", "effect": "NoSchedule"}
225+
// The JSON must follow Kubernetes
226+
// Toleration structure:
227+
// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#toleration-v1-core
228+
InputParameterSpec toleration_json = 6;
210229
}
211230

212231
// Matches https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#labelselectorrequirement-v1-meta and
@@ -244,3 +263,106 @@ message EmptyDirMount {
244263
optional string medium = 3;
245264
optional string size_limit = 4;
246265
}
266+
267+
// The proto messages below are copied from pipeline_spec.proto
268+
// and should match their structure.
269+
270+
// Represents an input parameter. The value can be taken from an upstream
271+
// task's output parameter (if specifying `producer_task` and
272+
// `output_parameter_key`, or it can be a runtime value, which can either be
273+
// determined at compile-time, or from a pipeline parameter.
274+
message InputParameterSpec {
275+
// Represents an upstream task's output parameter.
276+
message TaskOutputParameterSpec {
277+
// The name of the upstream task which produces the output parameter that
278+
// matches with the `output_parameter_key`.
279+
string producer_task = 1;
280+
281+
// The key of [TaskOutputsSpec.parameters][] map of the producer task.
282+
string output_parameter_key = 2;
283+
}
284+
285+
// Represents an upstream task's final status. The field can only be set if
286+
// the schema version is `2.0.0`. The resolved input parameter will be a
287+
// json payload in string type.
288+
message TaskFinalStatus {
289+
// The name of the upsteram task where the final status is coming from.
290+
string producer_task = 1;
291+
}
292+
293+
oneof kind {
294+
// Output parameter from an upstream task.
295+
TaskOutputParameterSpec task_output_parameter = 1;
296+
// A constant value or runtime parameter.
297+
ValueOrRuntimeParameter runtime_value = 2;
298+
// Pass the input parameter from parent component input parameter.
299+
string component_input_parameter = 3;
300+
// The final status of an upstream task.
301+
TaskFinalStatus task_final_status = 5;
302+
}
303+
304+
// Selector expression of Common Expression Language (CEL)
305+
// that applies to the parameter found from above kind.
306+
//
307+
// The expression is applied to the Value type
308+
// [Value][]. For example,
309+
// 'size(string_value)' will return the size of the Value.string_value.
310+
//
311+
// After applying the selection, the parameter will be returned as a
312+
// [Value][]. The type of the Value is either deferred from the input
313+
// definition in the corresponding
314+
// [ComponentSpec.input_definitions.parameters][], or if not found,
315+
// automatically deferred as either string value or double value.
316+
//
317+
// In addition to the builtin functions in CEL, The value.string_value can
318+
// be treated as a json string and parsed to the [google.protobuf.Value][]
319+
// proto message. Then, the CEL expression provided in this field will be
320+
// used to get the requested field. For examples,
321+
// - if Value.string_value is a json array of "[1.1, 2.2, 3.3]",
322+
// 'parseJson(string_value)[i]' will pass the ith parameter from the list
323+
// to the current task, or
324+
// - if the Value.string_value is a json map of "{"a": 1.1, "b": 2.2,
325+
// "c": 3.3}, 'parseJson(string_value)[key]' will pass the map value from
326+
// the struct map to the current task.
327+
//
328+
// If unset, the value will be passed directly to the current task.
329+
string parameter_expression_selector = 4;
330+
}
331+
332+
// Definition for a value or reference to a runtime parameter. A
333+
// ValueOrRuntimeParameter instance can be either a field value that is
334+
// determined during compilation time, or a runtime parameter which will be
335+
// determined during runtime.
336+
message ValueOrRuntimeParameter {
337+
oneof value {
338+
// Constant value which is determined in compile time.
339+
// Deprecated. Use [ValueOrRuntimeParameter.constant][] instead.
340+
Value constant_value = 1 [deprecated = true];
341+
// The runtime parameter refers to the parent component input parameter.
342+
string runtime_parameter = 2;
343+
// Constant value which is determined in compile time.
344+
google.protobuf.Value constant = 3;
345+
}
346+
}
347+
348+
// Value is the value of the field.
349+
message Value {
350+
oneof value {
351+
// An integer value
352+
int64 int_value = 1;
353+
// A double value
354+
double double_value = 2;
355+
// A string value
356+
string string_value = 3;
357+
}
358+
}
359+
360+
// Represents an upstream task's output parameter.
361+
message TaskOutputParameterSpec {
362+
// The name of the upstream task which produces the output parameter that
363+
// matches with the `output_parameter_key`.
364+
string producer_task = 1;
365+
366+
// The key of [TaskOutputsSpec.parameters][] map of the producer task.
367+
string output_parameter_key = 2;
368+
}

kubernetes_platform/python/kfp/kubernetes/common.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2023 The Kubeflow Authors
1+
# Copyright 2025 The Kubeflow Authors
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -12,6 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from typing import Union
16+
17+
from kfp.dsl import pipeline_channel
18+
from kfp.compiler.pipeline_spec_builder import to_protobuf_value
19+
from kfp.dsl import PipelineTask
1520
from google.protobuf import json_format
1621
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb
1722

@@ -21,3 +26,29 @@ def get_existing_kubernetes_config_as_message(
2126
cur_k8_config_dict = task.platform_config.get('kubernetes', {})
2227
k8_config_msg = pb.KubernetesExecutorConfig()
2328
return json_format.ParseDict(cur_k8_config_dict, k8_config_msg)
29+
30+
31+
def parse_k8s_parameter_input(
32+
input_param: Union[pipeline_channel.PipelineParameterChannel, str, dict],
33+
task: PipelineTask,
34+
) -> pb.InputParameterSpec:
35+
param_spec = pb.InputParameterSpec()
36+
37+
if isinstance(input_param, (str, dict)):
38+
param_spec.runtime_value.constant.CopyFrom(to_protobuf_value(input_param))
39+
elif isinstance(input_param, pipeline_channel.PipelineParameterChannel):
40+
if input_param.task_name is None:
41+
param_spec.component_input_parameter = input_param.full_name
42+
43+
else:
44+
param_spec.task_output_parameter.producer_task = input_param.task_name
45+
param_spec.task_output_parameter.output_parameter_key = input_param.name
46+
if input_param.task:
47+
task.after(input_param.task)
48+
else:
49+
raise ValueError(
50+
f'Argument for {"input_param"!r} must be an instance of str, dict, or PipelineChannel. '
51+
f'Got unknown input type: {type(input_param)!r}.'
52+
)
53+
54+
return param_spec

kubernetes_platform/python/kfp/kubernetes/config_map.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,17 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Dict
15+
from typing import Dict, Union
1616

1717
from google.protobuf import json_format
18-
from kfp.dsl import PipelineTask
18+
from kfp.dsl import PipelineTask, pipeline_channel
1919
from kfp.kubernetes import common
2020
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb
2121

2222

2323
def use_config_map_as_env(
2424
task: PipelineTask,
25-
config_map_name: str,
25+
config_map_name: Union[pipeline_channel.PipelineParameterChannel, str],
2626
config_map_key_to_env: Dict[str, str],
2727
) -> PipelineTask:
2828
"""Use a Kubernetes ConfigMap as an environment variable as described by the `Kubernetes documentation
@@ -45,10 +45,14 @@ def use_config_map_as_env(
4545
env_var=env_var,
4646
) for config_map_key, env_var in config_map_key_to_env.items()
4747
]
48-
config_map_as_env = pb.ConfigMapAsEnv(
49-
config_map_name=config_map_name,
50-
key_to_env=key_to_env,
51-
)
48+
config_map_as_env = pb.ConfigMapAsEnv(key_to_env=key_to_env)
49+
50+
config_map_name_parameter = common.parse_k8s_parameter_input(config_map_name, task)
51+
config_map_as_env.config_name_parameter.CopyFrom(config_map_name_parameter)
52+
53+
# deprecated: for backwards compatibility
54+
if isinstance(config_map_name, str):
55+
config_map_as_env.config_map_name = config_map_name
5256

5357
msg.config_map_as_env.append(config_map_as_env)
5458

@@ -59,7 +63,7 @@ def use_config_map_as_env(
5963

6064
def use_config_map_as_volume(
6165
task: PipelineTask,
62-
config_map_name: str,
66+
config_map_name: Union[pipeline_channel.PipelineParameterChannel, str],
6367
mount_path: str,
6468
optional: bool = False,
6569
) -> PipelineTask:
@@ -79,12 +83,18 @@ def use_config_map_as_volume(
7983
msg = common.get_existing_kubernetes_config_as_message(task)
8084

8185
config_map_as_vol = pb.ConfigMapAsVolume(
82-
config_map_name=config_map_name,
8386
mount_path=mount_path,
8487
optional=optional,
8588
)
86-
msg.config_map_as_volume.append(config_map_as_vol)
8789

90+
config_map_name_parameter = common.parse_k8s_parameter_input(config_map_name, task)
91+
config_map_as_vol.config_name_parameter.CopyFrom(config_map_name_parameter)
92+
93+
# deprecated: for backwards compatibility
94+
if isinstance(config_map_name, str):
95+
config_map_as_vol.config_map_name = config_map_name
96+
97+
msg.config_map_as_volume.append(config_map_as_vol)
8898
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)
8999

90100
return task

0 commit comments

Comments
 (0)