Skip to content

Commit b219b2e

Browse files
authored
fix(sdk): Add new logic to cover new tekton runafter condition (kubeflow#1016)
* add new logic to cover new tekton runafter condition * fix loop condition error
1 parent 909c4f2 commit b219b2e

File tree

7 files changed

+377
-0
lines changed

7 files changed

+377
-0
lines changed

sdk/python/kfp_tekton/compiler/compiler.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,6 +1394,45 @@ def get_when_task(input_task_when, depended_conditions):
13941394
pipeline_run['spec']['podTemplate']['nodeSelector'] = copy.deepcopy(pipeline_conf.default_pod_node_selector)
13951395
workflow = pipeline_run
13961396

1397+
# populate dependend condition for all the runafter tasks
1398+
def populate_runafter_condition(task):
1399+
task_runafter = task.get('runAfter')
1400+
if task_runafter:
1401+
for t in workflow['spec']['pipelineSpec']['tasks']:
1402+
if t['name'] in task_runafter:
1403+
if t.get('when'):
1404+
task.setdefault('when', [])
1405+
for when_item in t['when']:
1406+
if when_item not in task['when']:
1407+
add_conditions = True
1408+
# Do not add condition if the condition is not in the same graph/pipelineloop
1409+
for pipeline_loop in self.loops_pipeline.values():
1410+
if task['name'] in pipeline_loop['task_list']:
1411+
task_input = re.findall('\$\(tasks.([^ \t\n.:,;{}]+).results.([^ \t\n.:,;{}]+)\)', when_item['input'])
1412+
if task_input and task_input[0][0] not in pipeline_loop['task_list']:
1413+
add_conditions = False
1414+
if add_conditions:
1415+
task['when'].append(when_item)
1416+
1417+
# search runafter tree logic before populating the condition
1418+
visited_tasks = {}
1419+
task_queue = []
1420+
for task in workflow['spec']['pipelineSpec']['tasks']:
1421+
task_runafter = task.get('runAfter')
1422+
if task_runafter:
1423+
task_queue.append(task)
1424+
while task_queue:
1425+
popped_task = task_queue.pop(0)
1426+
populate_condition = True
1427+
for queued_task in task_queue:
1428+
if queued_task['name'] in popped_task['runAfter'] and len(task_queue) != visited_tasks.get(popped_task['name']):
1429+
visited_tasks[popped_task['name']] = len(task_queue)
1430+
task_queue.append(popped_task)
1431+
populate_condition = False
1432+
break
1433+
if populate_condition:
1434+
populate_runafter_condition(popped_task)
1435+
13971436
return workflow
13981437

13991438
def _sanitize_and_inject_artifact(self, pipeline: dsl.Pipeline, pipeline_conf=None):

sdk/python/tests/compiler/compiler_tests.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,13 @@ def test_big_data_multi_volumes_2_workflow(self):
148148
from .testdata.big_data_multi_volumes_2 import big_data
149149
self._test_pipeline_workflow(big_data, 'big_data_multi_volumes_2.yaml', skip_noninlined=True)
150150

151+
def test_condition_depend_workflow(self):
152+
"""
153+
Test compiling a condition depend workflow.
154+
"""
155+
from .testdata.condition_depend import pipeline
156+
self._test_pipeline_workflow(pipeline, 'condition_depend.yaml', skip_noninlined=True)
157+
151158
def test_recur_cond_workflow(self):
152159
"""
153160
Test compiling a recurive condition workflow.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Copyright 2022 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# This use case was discussed in https://github.com/kubeflow/kfp-tekton/issues/845
16+
17+
from kfp import dsl, components
18+
19+
op1_yaml = '''\
20+
name: 'my-in-coop1'
21+
inputs:
22+
- {name: item, type: Integer}
23+
- {name: param, type: Integer}
24+
implementation:
25+
container:
26+
image: library/bash:4.4.23
27+
command: ['sh', '-c']
28+
args:
29+
- |
30+
set -e
31+
echo op1 "$0" "$1"
32+
- {inputValue: item}
33+
- {inputValue: param}
34+
'''
35+
36+
37+
def false() -> str:
38+
return 'false'
39+
40+
41+
false_op = components.create_component_from_func(
42+
false, base_image='python:alpine3.6')
43+
44+
45+
@dsl.pipeline(name='pipeline')
46+
def pipeline(param: int = 10):
47+
condition_op = false_op()
48+
cond = condition_op.output
49+
with dsl.Condition(cond == 'true'):
50+
op2_template = components.load_component_from_text(op1_yaml)
51+
op2 = op2_template(1, param)
52+
53+
op3_template = components.load_component_from_text(op1_yaml)
54+
op3 = op3_template(1, param)
55+
op4_template = components.load_component_from_text(op1_yaml)
56+
op4 = op4_template(1, param)
57+
op4.after(op2)
58+
op3.after(op4)
59+
60+
61+
if __name__ == '__main__':
62+
from kfp_tekton.compiler import TektonCompiler as Compiler
63+
from kfp_tekton.compiler.pipeline_utils import TektonPipelineConf
64+
tekton_pipeline_conf = TektonPipelineConf()
65+
tekton_pipeline_conf.set_tekton_inline_spec(True)
66+
Compiler().compile(pipeline, __file__.replace('.py', '.yaml'), tekton_pipeline_conf=tekton_pipeline_conf)
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
# Copyright 2021 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: tekton.dev/v1beta1
16+
kind: PipelineRun
17+
metadata:
18+
name: pipeline
19+
annotations:
20+
tekton.dev/output_artifacts: '{"false": [{"key": "artifacts/$PIPELINERUN/false/Output.tgz",
21+
"name": "false-Output", "path": "/tmp/outputs/Output/data"}]}'
22+
tekton.dev/input_artifacts: '{}'
23+
tekton.dev/artifact_bucket: mlpipeline
24+
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
25+
tekton.dev/artifact_endpoint_scheme: http://
26+
tekton.dev/artifact_items: '{"false": [["Output", "$(results.output.path)"]],
27+
"my-in-coop1": [], "my-in-coop1-2": [], "my-in-coop1-3": []}'
28+
sidecar.istio.io/inject: "false"
29+
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
30+
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "param",
31+
"optional": true, "type": "Integer"}], "name": "pipeline"}'
32+
spec:
33+
params:
34+
- name: param
35+
value: '10'
36+
pipelineSpec:
37+
params:
38+
- name: param
39+
default: '10'
40+
tasks:
41+
- name: "false"
42+
taskSpec:
43+
steps:
44+
- name: main
45+
args:
46+
- '----output-paths'
47+
- $(results.output.path)
48+
command:
49+
- sh
50+
- -ec
51+
- |
52+
program_path=$(mktemp)
53+
printf "%s" "$0" > "$program_path"
54+
python3 -u "$program_path" "$@"
55+
- |
56+
def false():
57+
return 'false'
58+
59+
def _serialize_str(str_value: str) -> str:
60+
if not isinstance(str_value, str):
61+
raise TypeError('Value "{}" has type "{}" instead of str.'.format(
62+
str(str_value), str(type(str_value))))
63+
return str_value
64+
65+
import argparse
66+
_parser = argparse.ArgumentParser(prog='False', description='')
67+
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
68+
_parsed_args = vars(_parser.parse_args())
69+
_output_files = _parsed_args.pop("_output_paths", [])
70+
71+
_outputs = false(**_parsed_args)
72+
73+
_outputs = [_outputs]
74+
75+
_output_serializers = [
76+
_serialize_str,
77+
78+
]
79+
80+
import os
81+
for idx, output_file in enumerate(_output_files):
82+
try:
83+
os.makedirs(os.path.dirname(output_file))
84+
except OSError:
85+
pass
86+
with open(output_file, 'w') as f:
87+
f.write(_output_serializers[idx](_outputs[idx]))
88+
image: python:alpine3.6
89+
results:
90+
- name: output
91+
type: string
92+
description: /tmp/outputs/Output/data
93+
metadata:
94+
labels:
95+
pipelines.kubeflow.org/pipelinename: ''
96+
pipelines.kubeflow.org/generation: ''
97+
pipelines.kubeflow.org/cache_enabled: "true"
98+
annotations:
99+
pipelines.kubeflow.org/component_spec_digest: '{"name": "False", "outputs":
100+
[{"name": "Output", "type": "String"}], "version": "False@sha256=ab6d70d69626be9a388e3d6d92c4eae87095dc005400a41e6613f4b855c694f6"}'
101+
tekton.dev/template: ''
102+
timeout: 525600m
103+
- name: my-in-coop1
104+
params:
105+
- name: param
106+
value: $(params.param)
107+
taskSpec:
108+
steps:
109+
- name: main
110+
args:
111+
- |
112+
set -e
113+
echo op1 "$0" "$1"
114+
- '1'
115+
- $(inputs.params.param)
116+
command:
117+
- sh
118+
- -c
119+
image: library/bash:4.4.23
120+
params:
121+
- name: param
122+
metadata:
123+
labels:
124+
pipelines.kubeflow.org/pipelinename: ''
125+
pipelines.kubeflow.org/generation: ''
126+
pipelines.kubeflow.org/cache_enabled: "true"
127+
annotations:
128+
pipelines.kubeflow.org/component_spec_digest: '{"name": "my-in-coop1",
129+
"outputs": [], "version": "my-in-coop1@sha256=8ccab3a28a39a406554d964865f2ccb0aed854a43b6de827f613eff2bccd6f8f"}'
130+
tekton.dev/template: ''
131+
when:
132+
- input: $(tasks.condition-1.results.outcome)
133+
operator: in
134+
values:
135+
- "true"
136+
timeout: 525600m
137+
- name: my-in-coop1-2
138+
params:
139+
- name: param
140+
value: $(params.param)
141+
taskSpec:
142+
steps:
143+
- name: main
144+
args:
145+
- |
146+
set -e
147+
echo op1 "$0" "$1"
148+
- '1'
149+
- $(inputs.params.param)
150+
command:
151+
- sh
152+
- -c
153+
image: library/bash:4.4.23
154+
params:
155+
- name: param
156+
metadata:
157+
labels:
158+
pipelines.kubeflow.org/pipelinename: ''
159+
pipelines.kubeflow.org/generation: ''
160+
pipelines.kubeflow.org/cache_enabled: "true"
161+
annotations:
162+
pipelines.kubeflow.org/component_spec_digest: '{"name": "my-in-coop1",
163+
"outputs": [], "version": "my-in-coop1@sha256=8ccab3a28a39a406554d964865f2ccb0aed854a43b6de827f613eff2bccd6f8f"}'
164+
tekton.dev/template: ''
165+
runAfter:
166+
- my-in-coop1-3
167+
timeout: 525600m
168+
when:
169+
- input: $(tasks.condition-1.results.outcome)
170+
operator: in
171+
values:
172+
- "true"
173+
- name: my-in-coop1-3
174+
params:
175+
- name: param
176+
value: $(params.param)
177+
taskSpec:
178+
steps:
179+
- name: main
180+
args:
181+
- |
182+
set -e
183+
echo op1 "$0" "$1"
184+
- '1'
185+
- $(inputs.params.param)
186+
command:
187+
- sh
188+
- -c
189+
image: library/bash:4.4.23
190+
params:
191+
- name: param
192+
metadata:
193+
labels:
194+
pipelines.kubeflow.org/pipelinename: ''
195+
pipelines.kubeflow.org/generation: ''
196+
pipelines.kubeflow.org/cache_enabled: "true"
197+
annotations:
198+
pipelines.kubeflow.org/component_spec_digest: '{"name": "my-in-coop1",
199+
"outputs": [], "version": "my-in-coop1@sha256=8ccab3a28a39a406554d964865f2ccb0aed854a43b6de827f613eff2bccd6f8f"}'
200+
tekton.dev/template: ''
201+
runAfter:
202+
- my-in-coop1
203+
timeout: 525600m
204+
when:
205+
- input: $(tasks.condition-1.results.outcome)
206+
operator: in
207+
values:
208+
- "true"
209+
- name: condition-1
210+
params:
211+
- name: operand1
212+
value: $(tasks.false.results.output)
213+
- name: operand2
214+
value: "true"
215+
- name: operator
216+
value: ==
217+
taskSpec:
218+
results:
219+
- name: outcome
220+
type: string
221+
description: Conditional task outcome
222+
params:
223+
- name: operand1
224+
- name: operand2
225+
- name: operator
226+
steps:
227+
- name: main
228+
command:
229+
- sh
230+
- -ec
231+
- program_path=$(mktemp); printf "%s" "$0" > "$program_path"; python3 -u
232+
"$program_path" "$1" "$2"
233+
args:
234+
- |
235+
import sys
236+
input1=str.rstrip(sys.argv[1])
237+
input2=str.rstrip(sys.argv[2])
238+
try:
239+
input1=int(input1)
240+
input2=int(input2)
241+
except:
242+
input1=str(input1)
243+
outcome="true" if (input1 $(inputs.params.operator) input2) else "false"
244+
f = open("/tekton/results/outcome", "w")
245+
f.write(outcome)
246+
f.close()
247+
- $(inputs.params.operand1)
248+
- $(inputs.params.operand2)
249+
image: python:alpine3.6
250+
timeout: 525600m

sdk/python/tests/compiler/testdata/loop_with_conditional_dependency.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ spec:
147147
runAfter:
148148
- print-1
149149
timeout: 525600m
150+
when:
151+
- input: $(tasks.condition-cel.results.outcome)
152+
operator: in
153+
values:
154+
- "true"
150155
- runAfter:
151156
- print-1
152157
name: empty-loop-for-loop-2

0 commit comments

Comments
 (0)