Skip to content

Commit 7e8d5ad

Browse files
authored
feat(sdk): add iterateParamPassStyle and itemPassStyle params to loop (kubeflow#1059)
* add iterateParamPassStyle and itemPassStyle params to loop * update new fields to store in extra configs * update dsl to take extra_fields as new arguments * make extra field values configurable * update error message
1 parent d0a0706 commit 7e8d5ad

File tree

5 files changed

+77
-19
lines changed

5 files changed

+77
-19
lines changed

sdk/python/kfp_tekton/compiler/compiler.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,23 @@ def process_parameter(parameter):
573573
if sub_group.parallelism is not None and sub_group.parallelism > 0:
574574
self.loops_pipeline[group_name]['spec']['parallelism'] = sub_group.parallelism
575575

576+
def insert_extra_config_field(config_name, config_object, extra_field_name):
577+
# Default allowed values
578+
config_value_list = ['inline', 'file']
579+
config_value = config_object.lower()
580+
# Update the list of allowed values if exist
581+
if hasattr(sub_group, 'config_value_list'):
582+
config_value_list = sub_group.config_value_list.get(extra_field_name, config_value_list)
583+
if config_value in config_value_list:
584+
self.loops_pipeline[group_name]['spec'][config_name] = config_value
585+
else:
586+
raise ValueError("%s value in loop %s must be one of [%s], not %s" %
587+
(config_name, group_name, ",".join(config_value_list), config_value))
588+
if hasattr(sub_group, 'iterate_param_pass_style') and sub_group.iterate_param_pass_style is not None:
589+
insert_extra_config_field('iterateParamPassStyle', sub_group.iterate_param_pass_style, 'iterate_param_pass_style')
590+
if hasattr(sub_group, 'item_pass_style') and sub_group.item_pass_style is not None:
591+
insert_extra_config_field('itemPassStyle', sub_group.item_pass_style, 'item_pass_style')
592+
576593
return template
577594

578595
def _create_dag_templates(self, pipeline, op_transformers=None, params=None, op_to_templates_handler=None):

sdk/python/kfp_tekton/tekton.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,23 +239,41 @@ class Loop(dsl.ParallelFor):
239239

240240
@classmethod
241241
def sequential(cls,
242-
loop_args: _for_loop.ItemList):
243-
return cls(loop_args=loop_args, parallelism=1)
242+
loop_args: _for_loop.ItemList,
243+
extra_fields: Optional[dict] = None,
244+
valid_extra_field_values: Optional[dict] = None):
245+
return cls(loop_args=loop_args,
246+
parallelism=1,
247+
extra_fields=extra_fields,
248+
valid_extra_field_values=valid_extra_field_values)
244249

245250
@classmethod
246251
def from_string(cls,
247252
loop_args: Union[str, _pipeline_param.PipelineParam],
248253
separator: Optional[Union[str, _pipeline_param.PipelineParam]] = None,
249-
parallelism: Optional[int] = None):
250-
return cls(loop_args=loop_args, separator=separator, parallelism=parallelism)
254+
parallelism: Optional[int] = None,
255+
extra_fields: Optional[dict] = None,
256+
valid_extra_field_values: Optional[dict] = None):
257+
return cls(loop_args=loop_args,
258+
separator=separator,
259+
parallelism=parallelism,
260+
extra_fields=extra_fields,
261+
valid_extra_field_values=valid_extra_field_values)
251262

252263
@classmethod
253264
def range(cls,
254265
start: Union[_Num, PipelineParam],
255266
end: Union[_Num, PipelineParam],
256267
step: Optional[Union[_Num, PipelineParam]] = None,
257-
parallelism: Optional[int] = None):
258-
return cls(start=start, step=step, end=end, parallelism=parallelism)
268+
parallelism: Optional[int] = None,
269+
extra_fields: Optional[dict] = None,
270+
valid_extra_field_values: Optional[dict] = None):
271+
return cls(start=start,
272+
step=step,
273+
end=end,
274+
parallelism=parallelism,
275+
extra_fields=extra_fields,
276+
valid_extra_field_values=valid_extra_field_values)
259277

260278
def add_pod_annotation(self, name: str, value: str):
261279
"""Adds a pod's metadata annotation.
@@ -291,14 +309,31 @@ def __init__(self,
291309
end: Union[_Num, PipelineParam, None] = None,
292310
step: Union[_Num, PipelineParam, None] = None,
293311
separator: Optional[Union[str, _pipeline_param.PipelineParam]] = None,
294-
parallelism: Optional[int] = None):
312+
parallelism: Optional[int] = None,
313+
extra_fields: Optional[dict] = None,
314+
valid_extra_field_values: Optional[dict] = None):
295315
self.start = None
296316
self.end = None
297317
self.step = None
298318
self.call_enumerate = False
299319
self.iteration_number = None
300320
self.pod_annotations = {}
301321
self.pod_labels = {}
322+
self.iterate_param_pass_style = None
323+
self.item_pass_style = None
324+
self.config_value_list = {"iterate_param_pass_style": ["inline", "file"],
325+
"item_pass_style": ["inline", "file"]}
326+
if extra_fields:
327+
if extra_fields.get('iterate_param_pass_style'):
328+
self.iterate_param_pass_style = extra_fields['iterate_param_pass_style']
329+
if extra_fields.get('item_pass_style'):
330+
self.item_pass_style = extra_fields['item_pass_style']
331+
332+
# Update allowed values in the extra fields
333+
if valid_extra_field_values:
334+
for k, v in valid_extra_field_values.items():
335+
self.config_value_list[k] = v
336+
302337
if start and end:
303338
super().__init__(loop_args=["iteration"], parallelism=parallelism)
304339
self.start = start

sdk/python/tests/compiler/testdata/loop_with_numeric.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,12 @@ def empty(self):
4747
def pipeline(my_pipe_param: int = 10, start: int = 1, end: int = 2):
4848
start_2 = 1
4949
end_2 = 2
50-
with Loop.range(start=start, end=end) as item:
50+
iterate_param_pass_style_field = {'iterate_param_pass_style': 'inline'}
51+
item_pass_style_field = {'item_pass_style': 'file'}
52+
with Loop.range(start=start, end=end, extra_fields=iterate_param_pass_style_field) as item:
5153
op1_template = components.load_component_from_text(op1_yaml)
5254
op1_template(item, my_pipe_param)
53-
with Loop.range(start=start_2, end=end_2) as item2:
55+
with Loop.range(start=start_2, end=end_2, extra_fields=item_pass_style_field) as item2:
5456
op1_template(item2, my_pipe_param)
5557

5658

sdk/python/tests/compiler/testdata/loop_with_numeric.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ spec:
5858
value: $(params.my_pipe_param)
5959
- name: to
6060
value: $(params.end)
61+
iterateParamPassStyle: inline
6162
taskSpec:
6263
apiVersion: custom.tekton.dev/v1alpha1
6364
kind: PipelineLoop
@@ -107,6 +108,7 @@ spec:
107108
value: $(params.my_pipe_param)
108109
- name: to
109110
value: '2'
111+
itemPassStyle: file
110112
taskSpec:
111113
apiVersion: custom.tekton.dev/v1alpha1
112114
kind: PipelineLoop

sdk/python/tests/compiler/testdata/loop_with_numeric_noninlined.yaml

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,17 @@ metadata:
4242
"loop-item-param-1", "type": "string"}, {"name": "my_pipe_param", "type": "string"}],
4343
"steps": [{"args": ["set -e\necho op1 \"$0\" \"$1\"\n", "$(inputs.params.loop-item-param-1)",
4444
"$(inputs.params.my_pipe_param)"], "command": ["sh", "-c"], "image": "library/bash:4.4.23",
45-
"name": "main"}]}}, {"name": "my-pipeline-for-loop-4", "params": [{"name": "from",
46-
"value": "1"}, {"name": "my_pipe_param", "value": "$(params.my_pipe_param)"},
47-
{"name": "to", "value": "2"}], "taskRef": {"apiVersion": "custom.tekton.dev/v1alpha1",
48-
"kind": "PipelineLoop", "name": "my-pipeline-for-loop-4"}}]}}}, {"apiVersion":
49-
"custom.tekton.dev/v1alpha1", "kind": "PipelineLoop", "metadata": {"name": "my-pipeline-for-loop-4"},
50-
"spec": {"iterateNumeric": "loop-item-param-3", "pipelineSpec": {"params": [{"name":
51-
"loop-item-param-3", "type": "string"}, {"name": "my_pipe_param", "type": "string"}],
52-
"tasks": [{"name": "my-in-coop1-2", "params": [{"name": "loop-item-param-3",
53-
"value": "$(params.loop-item-param-3)"}, {"name": "my_pipe_param", "value":
54-
"$(params.my_pipe_param)"}], "taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest":
45+
"name": "main"}]}}, {"itemPassStyle": "file", "name": "my-pipeline-for-loop-4",
46+
"params": [{"name": "from", "value": "1"}, {"name": "my_pipe_param", "value":
47+
"$(params.my_pipe_param)"}, {"name": "to", "value": "2"}], "taskRef": {"apiVersion":
48+
"custom.tekton.dev/v1alpha1", "kind": "PipelineLoop", "name": "my-pipeline-for-loop-4"}}]}}},
49+
{"apiVersion": "custom.tekton.dev/v1alpha1", "kind": "PipelineLoop", "metadata":
50+
{"name": "my-pipeline-for-loop-4"}, "spec": {"iterateNumeric": "loop-item-param-3",
51+
"pipelineSpec": {"params": [{"name": "loop-item-param-3", "type": "string"},
52+
{"name": "my_pipe_param", "type": "string"}], "tasks": [{"name": "my-in-coop1-2",
53+
"params": [{"name": "loop-item-param-3", "value": "$(params.loop-item-param-3)"},
54+
{"name": "my_pipe_param", "value": "$(params.my_pipe_param)"}], "taskSpec":
55+
{"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest":
5556
"{\"name\": \"my-in-coop1\", \"outputs\": [], \"version\": \"my-in-coop1@sha256=42f0c6a60c7e435441b8afaeb382a771a9741fe3aabb203748fdbd72b25f1628\"}"},
5657
"labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "params": [{"name":
5758
"loop-item-param-3", "type": "string"}, {"name": "my_pipe_param", "type": "string"}],
@@ -90,3 +91,4 @@ spec:
9091
value: $(params.my_pipe_param)
9192
- name: to
9293
value: $(params.end)
94+
iterateParamPassStyle: inline

0 commit comments

Comments
 (0)