Skip to content

Commit 30eb178

Browse files
ScrapCodesTomcli
andauthored
feat(pipelineloop): Support last_idx and last_elem (kubeflow#1044)
* WIP: Support last_idx and last_elem * WIP: SDK changes to support last-idx variable. * fix * fix sdk param * fix controller to correctly update results * fix tests * fix unit tests and lint * fix test cases to be executable (opendatahub-io#5) * fix unit tests and lint * fix test cases to be executatble * removed unused code and added test * fix test Co-authored-by: Tommy Li <[email protected]>
1 parent 69b84e8 commit 30eb178

File tree

11 files changed

+417
-37
lines changed

11 files changed

+417
-37
lines changed

sdk/python/kfp_tekton/compiler/compiler.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ def __init__(self, **kwargs):
144144
self.produce_taskspec = True
145145
self.security_context = None
146146
self.automount_service_account_token = None
147+
self.group_names = {}
147148
super().__init__(**kwargs)
148149

149150
def _set_pipeline_conf(self, tekton_pipeline_conf: TektonPipelineConf):
@@ -205,11 +206,13 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies, pipeline_
205206
pipeline_name_copy = sanitize_k8s_name(pipeline_name, max_length=LOOP_PIPELINE_NAME_LENGTH)
206207
sub_group_name_copy = sanitize_k8s_name(sub_group.name, max_length=LOOP_GROUP_NAME_LENGTH, rev_truncate=True)
207208
self._group_names = [pipeline_name_copy, sub_group_name_copy]
209+
raw_group_name = '-'.join(self._group_names)
208210
if self.uuid:
209211
self._group_names.insert(1, self.uuid)
210212
# pipeline name (max 40) + loop id (max 5) + group name (max 16) + two connecting dashes (2) = 63 (Max size for CRD names)
211213
group_name = '-'.join(self._group_names) if group_type == "loop" or \
212214
group_type == "graph" or group_type == 'addon' else sub_group.name
215+
self.group_names[raw_group_name] = group_name
213216
template = {
214217
'metadata': {
215218
'name': group_name,
@@ -667,6 +670,8 @@ def _get_dependencies(self, pipeline, root_group, op_groups,
667670
upstream_op = pipeline.ops[upstream_op_name]
668671
elif upstream_op_name in opsgroups:
669672
upstream_op = opsgroups[upstream_op_name]
673+
elif "for-loop" in upstream_op_name:
674+
continue
670675
else:
671676
raise ValueError('compiler cannot find the ' +
672677
upstream_op_name)
@@ -740,7 +745,9 @@ def _get_inputs_outputs(
740745
if param.value:
741746
continue
742747
if param.op_name:
743-
upstream_op = pipeline.ops[param.op_name]
748+
upstream_op = pipeline.ops.get(param.op_name, None)
749+
if not upstream_op:
750+
continue
744751
upstream_groups, downstream_groups = \
745752
self._get_uncommon_ancestors(op_groups, opsgroup_groups, upstream_op, op)
746753
for i, group_name in enumerate(downstream_groups):
@@ -1823,6 +1830,14 @@ def _create_and_write_workflow(self,
18231830
workflow['metadata']['annotations']['tekton.dev/resource_templates'] = json.dumps(resource_templates,
18241831
sort_keys=True)
18251832

1833+
# Inject uuid to loop parameter task name if exist
1834+
if self.uuid:
1835+
for k, v in self.group_names.items():
1836+
if workflow['spec'].get('pipelineSpec'):
1837+
for task in workflow['spec']['pipelineSpec']['tasks']:
1838+
for param in task.get('params', []):
1839+
if isinstance(param['value'], str):
1840+
param['value'] = param['value'].replace(k, v)
18261841
TektonCompiler._write_workflow(workflow=workflow, package_path=package_path) # Tekton change
18271842

18281843
# Separate custom task CR from the main workflow

sdk/python/kfp_tekton/tekton.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,9 @@ def add_pod_label(self, name: str, value: str):
280280
def _next_id(self):
281281
return str(_pipeline.Pipeline.get_default_pipeline().get_next_group_id())
282282

283+
def _seek_id(self):
284+
return str(_pipeline.Pipeline.get_default_pipeline().group_id)
285+
283286
def __init__(self,
284287
loop_args: Union[str,
285288
_for_loop.ItemList,
@@ -324,6 +327,12 @@ def __init__(self,
324327
value=separator
325328
)
326329

330+
# param_name is unique on the pipeline level but not cluster level. Therefore, we still need to replace it to a cluster
331+
# unique uuid during our compiler step.
332+
param_name = "-".join([_pipeline.Pipeline.get_default_pipeline().name, self.type, str(int(self._seek_id()) + 1)])
333+
self.last_idx = dsl.PipelineParam(name="last-idx", op_name=param_name)
334+
self.last_elem = dsl.PipelineParam(name="last-elem", op_name=param_name)
335+
327336
def __enter__(self) -> Union[Tuple[TektonLoopIterationNumber, LoopArguments], _for_loop.LoopArguments]:
328337
rev = super().__enter__()
329338
if self.call_enumerate:

sdk/python/tests/compiler/compiler_tests.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@
6464

6565
class TestTektonCompiler(unittest.TestCase):
6666

67+
def test_last_idx(self):
68+
"""
69+
Test compiling a initial container workflow.
70+
"""
71+
from .testdata.last_idx import pipeline
72+
self._test_pipeline_workflow(pipeline, 'last_idx.yaml', skip_noninlined=False)
73+
6774
def test_init_container_workflow(self):
6875
"""
6976
Test compiling a initial container workflow.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Copyright 2020 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from kfp_tekton.compiler import TektonCompiler
17+
from kfp import dsl
18+
from kfp.components import load_component_from_text
19+
from kfp_tekton import tekton
20+
21+
PrintOp = load_component_from_text("""
22+
name: print
23+
inputs:
24+
- name: msg
25+
implementation:
26+
container:
27+
image: alpine:3.6
28+
command:
29+
- sh
30+
- -c
31+
- |
32+
set -e
33+
echo $0
34+
- {inputValue: msg }
35+
""")
36+
37+
38+
@dsl.pipeline(name='pipeline')
39+
def pipeline(param: int = 10):
40+
loop_args = "1,2"
41+
loop = tekton.Loop.from_string(loop_args, separator=',')
42+
with loop.enumerate() as (idx, el):
43+
# loop enter-local variables, not available outside of the loop
44+
PrintOp(f"it no {idx}: {el}")
45+
# using loop's fields, not enter-local variables
46+
PrintOp(f"last it: no {loop.last_idx}: {loop.last_elem}")
47+
48+
49+
if __name__ == "__main__":
50+
TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml'))
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# Copyright 2021 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: tekton.dev/v1beta1
16+
kind: PipelineRun
17+
metadata:
18+
name: pipeline
19+
annotations:
20+
tekton.dev/output_artifacts: '{}'
21+
tekton.dev/input_artifacts: '{"print-2": [{"name": "pipeline-for-loop-4-last-elem",
22+
"parent_task": "pipeline-for-loop-4"}, {"name": "pipeline-for-loop-4-last-idx",
23+
"parent_task": "pipeline-for-loop-4"}]}'
24+
tekton.dev/artifact_bucket: mlpipeline
25+
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
26+
tekton.dev/artifact_endpoint_scheme: http://
27+
tekton.dev/artifact_items: '{"print": [], "print-2": []}'
28+
sidecar.istio.io/inject: "false"
29+
tekton.dev/template: ''
30+
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
31+
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "param",
32+
"optional": true, "type": "Integer"}], "name": "pipeline"}'
33+
labels:
34+
pipelines.kubeflow.org/pipelinename: ''
35+
pipelines.kubeflow.org/generation: ''
36+
spec:
37+
params:
38+
- name: param
39+
value: '10'
40+
pipelineSpec:
41+
params:
42+
- name: param
43+
default: '10'
44+
tasks:
45+
- name: print-2
46+
params:
47+
- name: pipeline-for-loop-4-last-elem
48+
value: $(tasks.pipeline-for-loop-4.results.last-elem)
49+
- name: pipeline-for-loop-4-last-idx
50+
value: $(tasks.pipeline-for-loop-4.results.last-idx)
51+
taskSpec:
52+
steps:
53+
- name: main
54+
command:
55+
- sh
56+
- -c
57+
- |
58+
set -e
59+
echo $0
60+
- 'last it: no $(inputs.params.pipeline-for-loop-4-last-idx): $(inputs.params.pipeline-for-loop-4-last-elem)'
61+
image: alpine:3.6
62+
params:
63+
- name: pipeline-for-loop-4-last-elem
64+
- name: pipeline-for-loop-4-last-idx
65+
metadata:
66+
labels:
67+
pipelines.kubeflow.org/cache_enabled: "true"
68+
annotations:
69+
pipelines.kubeflow.org/component_spec_digest: '{"name": "print", "outputs":
70+
[], "version": "print@sha256=fab857e67527b293d70a6637e295a10c70b9b8b114e2b64cd5a17a33e3cab2a1"}'
71+
- name: pipeline-for-loop-4
72+
params:
73+
- name: loop-item-param-2
74+
value: 1,2
75+
- name: loop-item-param-3
76+
value: ','
77+
taskSpec:
78+
apiVersion: custom.tekton.dev/v1alpha1
79+
kind: PipelineLoop
80+
spec:
81+
pipelineSpec:
82+
params:
83+
- name: iteration-number-5
84+
type: string
85+
- name: loop-item-param-2
86+
type: string
87+
tasks:
88+
- name: print
89+
params:
90+
- name: iteration-number-5
91+
value: $(params.iteration-number-5)
92+
- name: loop-item-param-2
93+
value: $(params.loop-item-param-2)
94+
taskSpec:
95+
steps:
96+
- name: main
97+
command:
98+
- sh
99+
- -c
100+
- |
101+
set -e
102+
echo $0
103+
- 'it no $(inputs.params.iteration-number-5): $(inputs.params.loop-item-param-2)'
104+
image: alpine:3.6
105+
params:
106+
- name: iteration-number-5
107+
type: string
108+
- name: loop-item-param-2
109+
type: string
110+
metadata:
111+
labels:
112+
pipelines.kubeflow.org/cache_enabled: "true"
113+
annotations:
114+
pipelines.kubeflow.org/component_spec_digest: '{"name": "print",
115+
"outputs": [], "version": "print@sha256=fab857e67527b293d70a6637e295a10c70b9b8b114e2b64cd5a17a33e3cab2a1"}'
116+
iterationNumberParam: iteration-number-5
117+
iterateParam: loop-item-param-2
118+
iterateParamStringSeparator: loop-item-param-3
119+
metadata:
120+
labels:
121+
pipelines.kubeflow.org/cache_enabled: "true"
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2021 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: tekton.dev/v1beta1
16+
kind: PipelineRun
17+
metadata:
18+
name: pipeline
19+
annotations:
20+
tekton.dev/output_artifacts: '{}'
21+
tekton.dev/input_artifacts: '{"print-2": [{"name": "pipeline-for-loop-4-last-elem",
22+
"parent_task": "pipeline-for-loop-4"}, {"name": "pipeline-for-loop-4-last-idx",
23+
"parent_task": "pipeline-for-loop-4"}]}'
24+
tekton.dev/artifact_bucket: mlpipeline
25+
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
26+
tekton.dev/artifact_endpoint_scheme: http://
27+
tekton.dev/artifact_items: '{"print": [], "print-2": []}'
28+
sidecar.istio.io/inject: "false"
29+
tekton.dev/template: ''
30+
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
31+
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "param",
32+
"optional": true, "type": "Integer"}], "name": "pipeline"}'
33+
tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1",
34+
"kind": "PipelineLoop", "metadata": {"name": "pipeline-for-loop-4"}, "spec":
35+
{"iterateParam": "loop-item-param-2", "iterateParamStringSeparator": "loop-item-param-3",
36+
"iterationNumberParam": "iteration-number-5", "pipelineSpec": {"params": [{"name":
37+
"iteration-number-5", "type": "string"}, {"name": "loop-item-param-2", "type":
38+
"string"}], "tasks": [{"name": "print", "params": [{"name": "iteration-number-5",
39+
"value": "$(params.iteration-number-5)"}, {"name": "loop-item-param-2", "value":
40+
"$(params.loop-item-param-2)"}], "taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest":
41+
"{\"name\": \"print\", \"outputs\": [], \"version\": \"print@sha256=fab857e67527b293d70a6637e295a10c70b9b8b114e2b64cd5a17a33e3cab2a1\"}"},
42+
"labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "params": [{"name":
43+
"iteration-number-5", "type": "string"}, {"name": "loop-item-param-2", "type":
44+
"string"}], "steps": [{"command": ["sh", "-c", "set -e\necho $0\n", "it no $(inputs.params.iteration-number-5):
45+
$(inputs.params.loop-item-param-2)"], "image": "alpine:3.6", "name": "main"}]}}]}}}]'
46+
labels:
47+
pipelines.kubeflow.org/pipelinename: ''
48+
pipelines.kubeflow.org/generation: ''
49+
spec:
50+
params:
51+
- name: param
52+
value: '10'
53+
pipelineSpec:
54+
params:
55+
- name: param
56+
default: '10'
57+
tasks:
58+
- name: print-2
59+
params:
60+
- name: pipeline-for-loop-4-last-elem
61+
value: $(tasks.pipeline-for-loop-4.results.last-elem)
62+
- name: pipeline-for-loop-4-last-idx
63+
value: $(tasks.pipeline-for-loop-4.results.last-idx)
64+
taskSpec:
65+
steps:
66+
- name: main
67+
command:
68+
- sh
69+
- -c
70+
- |
71+
set -e
72+
echo $0
73+
- 'last it: no $(inputs.params.pipeline-for-loop-4-last-idx): $(inputs.params.pipeline-for-loop-4-last-elem)'
74+
image: alpine:3.6
75+
params:
76+
- name: pipeline-for-loop-4-last-elem
77+
- name: pipeline-for-loop-4-last-idx
78+
metadata:
79+
labels:
80+
pipelines.kubeflow.org/cache_enabled: "true"
81+
annotations:
82+
pipelines.kubeflow.org/component_spec_digest: '{"name": "print", "outputs":
83+
[], "version": "print@sha256=fab857e67527b293d70a6637e295a10c70b9b8b114e2b64cd5a17a33e3cab2a1"}'
84+
- name: pipeline-for-loop-4
85+
taskRef:
86+
apiVersion: custom.tekton.dev/v1alpha1
87+
kind: PipelineLoop
88+
name: pipeline-for-loop-4
89+
params:
90+
- name: loop-item-param-2
91+
value: 1,2
92+
- name: loop-item-param-3
93+
value: ','

sdk/python/tests/compiler/testdata/long_recursive_group_name.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def PrintOp(name: str, msg: str):
3838
inputs:
3939
- {name: input_text, type: String, description: 'Represents an input parameter.'}
4040
outputs:
41-
- {name: output_value, type: String, description: 'Represents an output paramter.'}
41+
- {name: output_value, type: String, description: 'Represents an output parameter.'}
4242
implementation:
4343
container:
4444
image: alpine:3.6

sdk/python/tests/compiler/testdata/long_recursive_group_name.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ metadata:
4040
"condition-cel-outcome", "value": "$(tasks.condition-cel.results.outcome)"}],
4141
"taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest":
4242
"{\"name\": \"print-iter\", \"outputs\": [{\"description\": \"Represents an
43-
output paramter.\", \"name\": \"output_value\", \"type\": \"String\"}], \"version\":
44-
\"print-iter@sha256=1390d83fb5d8aa6c6a41af858140c4c362850b13718dedacfc3fb0a633596313\"}"},
43+
output parameter.\", \"name\": \"output_value\", \"type\": \"String\"}], \"version\":
44+
\"print-iter@sha256=cefce1330b3d50a4e85b6ac51edd51f250a5cf18193353f27a2e13a9d7535555\"}"},
4545
"labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "params": [{"name":
4646
"condition-cel-outcome", "type": "string"}], "results": [{"description": "/tmp/outputs/output_value/data",
4747
"name": "output-value", "type": "string"}], "steps": [{"command": ["sh", "-c",

sdk/python/tests/compiler/testdata/long_recursive_group_name_noninlined.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ metadata:
4040
"condition-cel-outcome", "value": "$(tasks.condition-cel.results.outcome)"}],
4141
"taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec_digest":
4242
"{\"name\": \"print-iter\", \"outputs\": [{\"description\": \"Represents an
43-
output paramter.\", \"name\": \"output_value\", \"type\": \"String\"}], \"version\":
44-
\"print-iter@sha256=1390d83fb5d8aa6c6a41af858140c4c362850b13718dedacfc3fb0a633596313\"}"},
43+
output parameter.\", \"name\": \"output_value\", \"type\": \"String\"}], \"version\":
44+
\"print-iter@sha256=cefce1330b3d50a4e85b6ac51edd51f250a5cf18193353f27a2e13a9d7535555\"}"},
4545
"labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "params": [{"name":
4646
"condition-cel-outcome", "type": "string"}], "results": [{"description": "/tmp/outputs/output_value/data",
4747
"name": "output-value", "type": "string"}], "steps": [{"command": ["sh", "-c",

0 commit comments

Comments
 (0)