Skip to content

Commit 5767673

Browse files
authored
fix(sdk): fix nested loop counter param bug (kubeflow#1080)
* fix nested loop counter param bug * address comments
1 parent 6117df5 commit 5767673

File tree

5 files changed

+359
-2
lines changed

5 files changed

+359
-2
lines changed

sdk/python/kfp_tekton/compiler/_tekton_handler.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,10 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task
257257
custom_task_cr['spec']['iterationNumberParam']]
258258
custom_task_cr['spec']['iterateParam'] = custom_task[custom_task_key]['loop_args']
259259
separator = custom_task[custom_task_key].get('separator')
260+
start_end_step_keys = ['from', 'to', 'step']
260261
if separator is not None:
261262
custom_task_cr['spec']['iterateParamStringSeparator'] = separator
262263
if custom_task[custom_task_key].get('start') is not None:
263-
start_end_step_keys = ['from', 'to', 'step']
264264
custom_task_cr['spec']['pipelineSpec']['params'] = [value for value
265265
in custom_task_cr['spec']['pipelineSpec']['params']
266266
if value['name'] not in start_end_step_keys]
@@ -270,10 +270,24 @@ def _handle_tekton_custom_task(custom_task: dict, workflow: dict, recursive_task
270270

271271
custom_task_cr['spec']['iterateNumeric'] = custom_task_cr['spec']['iterateParam']
272272
custom_task_cr['spec'].pop('iterateParam')
273+
# check whether or not the nested custom task param reference need to be replaced
274+
custom_task_param_map = {}
273275
for custom_task_param in custom_task[custom_task_key]['spec']['params']:
274276
if custom_task_param['name'] != custom_task[custom_task_key]['loop_args'] and '$(tasks.' in custom_task_param['value']:
277+
custom_task_param_map.setdefault(custom_task_param['value'], []).append('$(params.%s)' % custom_task_param['name'])
278+
for key, item in custom_task_param_map.items():
279+
replacement_item = None
280+
if len(item) == 1:
281+
replacement_item = item[0]
282+
if len(item) > 1:
283+
forbidden_keystrings = ['$(params.%s)' % x for x in start_end_step_keys]
284+
for i in item:
285+
if i not in forbidden_keystrings:
286+
replacement_item = i
287+
break
288+
if replacement_item:
275289
custom_task_cr = json.loads(
276-
json.dumps(custom_task_cr).replace(custom_task_param['value'], '$(params.%s)' % custom_task_param['name']))
290+
json.dumps(custom_task_cr).replace(key, replacement_item))
277291

278292
# remove separator from CR params
279293
if custom_task[custom_task_key].get('separator') is not None:

sdk/python/tests/compiler/compiler_tests.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ def test_recur_cond_workflow(self):
190190
from .testdata.tekton_loop_dsl import pipeline
191191
self._test_pipeline_workflow(pipeline, 'tekton_loop_dsl.yaml')
192192

193+
def test_nested_loop_counter_param_workflow(self):
194+
"""
195+
Test compiling a loop workflow using tekton nested loop with counter params.
196+
"""
197+
from .testdata.nested_loop_counter_param import output_in_range_and_pass
198+
self._test_pipeline_workflow(output_in_range_and_pass, 'nested_loop_counter_param.yaml')
199+
193200
def test_nested_loop_same_arg_workflow(self):
194201
"""
195202
Test compiling a nested loop with same argument workflow.
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Copyright 2022 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import itertools
16+
17+
from kfp import dsl, components
18+
from kfp.components import load_component_from_text
19+
from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES, Loop
20+
from kfp_tekton.compiler import TektonCompiler
21+
import yaml
22+
23+
24+
ARTIFACT_FETCHER_IMAGE_NAME = "fetcher/image:latest"
25+
TEKTON_CUSTOM_TASK_IMAGES = TEKTON_CUSTOM_TASK_IMAGES.append(ARTIFACT_FETCHER_IMAGE_NAME)
26+
27+
_artifact_fetcher_no = 0
28+
29+
30+
def artifact_fetcher(**artifact_paths: str):
31+
'''A containerOp template resolving some artifacts, given their paths.'''
32+
global _artifact_fetcher_no
33+
template_yaml = {
34+
'name': f'artifact-fetcher-{_artifact_fetcher_no}',
35+
'description': 'Artifact Fetch',
36+
'inputs': [
37+
{'name': name, 'type': 'String'}
38+
for name in artifact_paths.keys()
39+
],
40+
'outputs': [
41+
{'name': name, 'type': 'Artifact'}
42+
for name in artifact_paths.keys()
43+
],
44+
'implementation': {
45+
'container': {
46+
'image': ARTIFACT_FETCHER_IMAGE_NAME,
47+
'command': ['sh', '-c'], # irrelevant
48+
'args': [
49+
'--apiVersion', 'fetcher.tekton.dev/v1alpha1',
50+
'--kind', 'FETCHER',
51+
'--name', 'artifact_fetcher',
52+
*itertools.chain(*[
53+
(f'--{name}', {'inputValue': name})
54+
for name in artifact_paths.keys()
55+
])
56+
]
57+
}
58+
}
59+
}
60+
_artifact_fetcher_no += 1
61+
template_str = yaml.dump(template_yaml, Dumper=yaml.SafeDumper)
62+
template = components.load_component_from_text(template_str)
63+
op = template(**artifact_paths)
64+
op.add_pod_annotation("valid_container", "false")
65+
return op
66+
67+
68+
class Coder:
69+
def empty(self):
70+
return ""
71+
72+
73+
TektonCompiler._get_unique_id_code = Coder.empty
74+
75+
76+
def PrintOp(name: str, msg: str = None):
77+
if msg is None:
78+
msg = name
79+
print_op = load_component_from_text(
80+
"""
81+
name: %s
82+
inputs:
83+
- {name: input_text, type: String, description: 'Represents an input parameter.'}
84+
outputs:
85+
- {name: output_value, type: String, description: 'Represents an output paramter.'}
86+
implementation:
87+
container:
88+
image: alpine:3.6
89+
command:
90+
- sh
91+
- -c
92+
- |
93+
set -e
94+
echo $0 > $1
95+
- {inputValue: input_text}
96+
- {outputPath: output_value}
97+
""" % (name)
98+
)
99+
return print_op(msg)
100+
101+
102+
@dsl.pipeline("output_in_range_and_pass")
103+
def output_in_range_and_pass():
104+
op0 = PrintOp('print-0', f"Hello!")
105+
with Loop.range(1, op0.output, step=2):
106+
op1 = PrintOp('print-1', f"print {op0.output}")
107+
op2 = artifact_fetcher(path=op0.output)
108+
109+
110+
if __name__ == '__main__':
111+
TektonCompiler().compile(output_in_range_and_pass, __file__.replace('.py', '.yaml'))
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# Copyright 2021 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: tekton.dev/v1beta1
16+
kind: PipelineRun
17+
metadata:
18+
name: output-in-range-and-pass
19+
annotations:
20+
tekton.dev/output_artifacts: '{"print-0": [{"key": "artifacts/$PIPELINERUN/print-0/output_value.tgz",
21+
"name": "print-0-output_value", "path": "/tmp/outputs/output_value/data"}],
22+
"print-1": [{"key": "artifacts/$PIPELINERUN/print-1/output_value.tgz", "name":
23+
"print-1-output_value", "path": "/tmp/outputs/output_value/data"}]}'
24+
tekton.dev/input_artifacts: '{"print-1": [{"name": "print-0-output_value", "parent_task":
25+
"print-0"}]}'
26+
tekton.dev/artifact_bucket: mlpipeline
27+
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
28+
tekton.dev/artifact_endpoint_scheme: http://
29+
tekton.dev/artifact_items: '{"print-0": [["output_value", "$(results.output-value.path)"]],
30+
"print-1": [["output_value", "$(results.output-value.path)"]]}'
31+
sidecar.istio.io/inject: "false"
32+
tekton.dev/template: ''
33+
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
34+
pipelines.kubeflow.org/pipeline_spec: '{"name": "output_in_range_and_pass"}'
35+
labels:
36+
pipelines.kubeflow.org/pipelinename: ''
37+
pipelines.kubeflow.org/generation: ''
38+
spec:
39+
pipelineSpec:
40+
tasks:
41+
- name: print-0
42+
taskSpec:
43+
steps:
44+
- name: main
45+
command:
46+
- sh
47+
- -c
48+
- |
49+
set -e
50+
echo $0 > $1
51+
- Hello!
52+
- $(results.output-value.path)
53+
image: alpine:3.6
54+
results:
55+
- name: output-value
56+
type: string
57+
description: /tmp/outputs/output_value/data
58+
metadata:
59+
labels:
60+
pipelines.kubeflow.org/cache_enabled: "true"
61+
annotations:
62+
pipelines.kubeflow.org/component_spec_digest: '{"name": "print-0", "outputs":
63+
[{"description": "Represents an output paramter.", "name": "output_value",
64+
"type": "String"}], "version": "print-0@sha256=5dd3c506ec54281b82008ca8ec5d8142834eae18d74ce8a110a31dd6e371b40d"}'
65+
- runAfter:
66+
- print-0
67+
name: output-in-range-and-pass-for-loop-2
68+
params:
69+
- name: from
70+
value: '1'
71+
- name: print-0-output_value
72+
value: $(tasks.print-0.results.output-value)
73+
- name: step
74+
value: '2'
75+
- name: to
76+
value: $(tasks.print-0.results.output-value)
77+
taskSpec:
78+
apiVersion: custom.tekton.dev/v1alpha1
79+
kind: PipelineLoop
80+
spec:
81+
pipelineSpec:
82+
params:
83+
- name: loop-item-param-1
84+
type: string
85+
- name: print-0-output_value
86+
type: string
87+
tasks:
88+
- name: print-1
89+
params:
90+
- name: print-0-output_value
91+
value: $(params.print-0-output_value)
92+
taskSpec:
93+
steps:
94+
- name: main
95+
command:
96+
- sh
97+
- -c
98+
- |
99+
set -e
100+
echo $0 > $1
101+
- print $(inputs.params.print-0-output_value)
102+
- $(results.output-value.path)
103+
image: alpine:3.6
104+
params:
105+
- name: print-0-output_value
106+
type: string
107+
results:
108+
- name: output-value
109+
type: string
110+
description: /tmp/outputs/output_value/data
111+
metadata:
112+
labels:
113+
pipelines.kubeflow.org/cache_enabled: "true"
114+
annotations:
115+
pipelines.kubeflow.org/component_spec_digest: '{"name": "print-1",
116+
"outputs": [{"description": "Represents an output paramter.",
117+
"name": "output_value", "type": "String"}], "version": "print-1@sha256=3b81342bc143f625b58ebdb01e7c83b145880dee807be35c1e16fdb835d46580"}'
118+
- name: artifact-fetcher-0
119+
params:
120+
- name: path
121+
value: $(params.print-0-output_value)
122+
taskRef:
123+
name: artifact_fetcher
124+
apiVersion: fetcher.tekton.dev/v1alpha1
125+
kind: FETCHER
126+
iterateNumeric: loop-item-param-1
127+
metadata:
128+
labels:
129+
pipelines.kubeflow.org/cache_enabled: "true"
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright 2021 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: tekton.dev/v1beta1
16+
kind: PipelineRun
17+
metadata:
18+
name: output-in-range-and-pass
19+
annotations:
20+
tekton.dev/output_artifacts: '{"print-0": [{"key": "artifacts/$PIPELINERUN/print-0/output_value.tgz",
21+
"name": "print-0-output_value", "path": "/tmp/outputs/output_value/data"}],
22+
"print-1": [{"key": "artifacts/$PIPELINERUN/print-1/output_value.tgz", "name":
23+
"print-1-output_value", "path": "/tmp/outputs/output_value/data"}]}'
24+
tekton.dev/input_artifacts: '{"print-1": [{"name": "print-0-output_value", "parent_task":
25+
"print-0"}]}'
26+
tekton.dev/artifact_bucket: mlpipeline
27+
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
28+
tekton.dev/artifact_endpoint_scheme: http://
29+
tekton.dev/artifact_items: '{"print-0": [["output_value", "$(results.output-value.path)"]],
30+
"print-1": [["output_value", "$(results.output-value.path)"]]}'
31+
sidecar.istio.io/inject: "false"
32+
tekton.dev/template: ''
33+
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
34+
pipelines.kubeflow.org/pipeline_spec: '{"name": "output_in_range_and_pass"}'
35+
tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1",
36+
"kind": "PipelineLoop", "metadata": {"name": "output-in-range-and-pass-for-loop-2"},
37+
"spec": {"iterateNumeric": "loop-item-param-1", "pipelineSpec": {"params": [{"name":
38+
"loop-item-param-1", "type": "string"}, {"name": "print-0-output_value", "type":
39+
"string"}], "tasks": [{"name": "print-1", "params": [{"name": "print-0-output_value",
40+
"value": "$(params.print-0-output_value)"}], "taskSpec": {"metadata": {"annotations":
41+
{"pipelines.kubeflow.org/component_spec_digest": "{\"name\": \"print-1\", \"outputs\":
42+
[{\"description\": \"Represents an output paramter.\", \"name\": \"output_value\",
43+
\"type\": \"String\"}], \"version\": \"print-1@sha256=3b81342bc143f625b58ebdb01e7c83b145880dee807be35c1e16fdb835d46580\"}"},
44+
"labels": {"pipelines.kubeflow.org/cache_enabled": "true"}}, "params": [{"name":
45+
"print-0-output_value", "type": "string"}], "results": [{"description": "/tmp/outputs/output_value/data",
46+
"name": "output-value", "type": "string"}], "steps": [{"command": ["sh", "-c",
47+
"set -e\necho $0 > $1\n", "print $(inputs.params.print-0-output_value)", "$(results.output-value.path)"],
48+
"image": "alpine:3.6", "name": "main"}]}}, {"name": "artifact-fetcher-1", "params":
49+
[{"name": "path", "value": "$(params.print-0-output_value)"}], "taskRef": {"apiVersion":
50+
"fetcher.tekton.dev/v1alpha1", "kind": "FETCHER", "name": "artifact_fetcher"}}]}}}]'
51+
labels:
52+
pipelines.kubeflow.org/pipelinename: ''
53+
pipelines.kubeflow.org/generation: ''
54+
spec:
55+
pipelineSpec:
56+
tasks:
57+
- name: print-0
58+
taskSpec:
59+
steps:
60+
- name: main
61+
command:
62+
- sh
63+
- -c
64+
- |
65+
set -e
66+
echo $0 > $1
67+
- Hello!
68+
- $(results.output-value.path)
69+
image: alpine:3.6
70+
results:
71+
- name: output-value
72+
type: string
73+
description: /tmp/outputs/output_value/data
74+
metadata:
75+
labels:
76+
pipelines.kubeflow.org/cache_enabled: "true"
77+
annotations:
78+
pipelines.kubeflow.org/component_spec_digest: '{"name": "print-0", "outputs":
79+
[{"description": "Represents an output paramter.", "name": "output_value",
80+
"type": "String"}], "version": "print-0@sha256=5dd3c506ec54281b82008ca8ec5d8142834eae18d74ce8a110a31dd6e371b40d"}'
81+
- runAfter:
82+
- print-0
83+
name: output-in-range-and-pass-for-loop-2
84+
taskRef:
85+
apiVersion: custom.tekton.dev/v1alpha1
86+
kind: PipelineLoop
87+
name: output-in-range-and-pass-for-loop-2
88+
params:
89+
- name: from
90+
value: '1'
91+
- name: print-0-output_value
92+
value: $(tasks.print-0.results.output-value)
93+
- name: step
94+
value: '2'
95+
- name: to
96+
value: $(tasks.print-0.results.output-value)

0 commit comments

Comments
 (0)