Skip to content

Commit 909c4f2

Browse files
authored
fix(sdk): fix exit handler param with empty exit task (kubeflow#1015)
1 parent eaac53d commit 909c4f2

File tree

4 files changed

+290
-2
lines changed

4 files changed

+290
-2
lines changed

sdk/python/kfp_tekton/compiler/compiler.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,8 +1229,12 @@ def get_when_task(input_task_when, depended_conditions):
12291229
for pp in op.inputs:
12301230
if pipeline_param == pp.full_name:
12311231
# Parameters from Tekton results need to be sanitized
1232-
substitute_param = '$(tasks.%s.results.%s)' % (sanitize_k8s_name(pp.op_name),
1233-
sanitize_k8s_name(pp.name, allow_capital=True))
1232+
substitute_param = ''
1233+
if pp.op_name:
1234+
substitute_param = '$(tasks.%s.results.%s)' % (sanitize_k8s_name(pp.op_name),
1235+
sanitize_k8s_name(pp.name, allow_capital=True))
1236+
else:
1237+
substitute_param = '$(params.%s)' % pipeline_param
12341238
tp['value'] = re.sub('\$\(inputs.params.%s\)' % pipeline_param, substitute_param, tp.get('value', ''))
12351239
break
12361240
# Not necessary for Tekton execution

sdk/python/tests/compiler/compiler_tests.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ def test_parallel_join_workflow(self):
127127
from .testdata.parallel_join import download_and_join
128128
self._test_pipeline_workflow(download_and_join, 'parallel_join.yaml', skip_noninlined=True)
129129

130+
def test_custom_task_exit_workflow(self):
131+
"""
132+
Test compiling a custom task exit workflow.
133+
"""
134+
from .testdata.custom_task_exit import test_pipeline
135+
self._test_pipeline_workflow(test_pipeline, 'custom_task_exit.yaml', skip_noninlined=True)
136+
130137
def test_big_data_multi_volumes_1_workflow(self):
131138
"""
132139
Test compiling a big data pipeline with multiple types of volumes workflow.
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# Copyright 2022 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from attr import dataclass, asdict, fields
17+
18+
from kfp import dsl
19+
from kfp.components import load_component_from_text
20+
from kfp_tekton.compiler import TektonCompiler
21+
from kfp_tekton.tekton import AddOnGroup
22+
23+
CEL_TASK_IMAGE_NAME = "veryunique/image:latest"
24+
from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES
25+
TEKTON_CUSTOM_TASK_IMAGES = TEKTON_CUSTOM_TASK_IMAGES.append(CEL_TASK_IMAGE_NAME)
26+
27+
28+
@dataclass
29+
class ExitHandlerFields:
30+
status: dsl.PipelineParam
31+
status_message: dsl.PipelineParam
32+
33+
def __getitem__(self, item: str) -> dsl.PipelineParam:
34+
di = asdict(self)
35+
return di.get(item) or di.get(item.replace('-', '_'))
36+
37+
38+
class ExitHandler(AddOnGroup):
39+
"""A custom OpsGroup which maps to a custom task"""
40+
def __init__(self):
41+
labels = {
42+
'pipelines.kubeflow.org/cache_enabled': 'false',
43+
}
44+
annotations = {
45+
'ws-pipelines.ibm.com/pipeline-cache-enabled': 'false',
46+
}
47+
48+
super().__init__(
49+
kind='Exception',
50+
api_version='custom.tekton.dev/v1alpha1',
51+
params={},
52+
is_finally=True,
53+
labels=labels,
54+
annotations=annotations,
55+
)
56+
57+
internal_params = {
58+
field.name: AddOnGroup.create_internal_param(field.name)
59+
for field in fields(ExitHandlerFields)
60+
}
61+
self._internal_params = internal_params
62+
self.fields = ExitHandlerFields(
63+
status=internal_params['status'],
64+
status_message=internal_params['status_message'],
65+
)
66+
67+
def __enter__(self) -> ExitHandlerFields:
68+
super().__enter__()
69+
return self.fields
70+
71+
def post_params(self, params: list) -> list:
72+
params_map = {
73+
param['name']: param for param in params
74+
}
75+
internal_params_names = set(self._internal_params.keys())
76+
params_map = {
77+
k: v for k, v in params_map.items()
78+
if k not in internal_params_names
79+
}
80+
params = list(params_map.values())
81+
params.append({
82+
'name': 'pipelinerun_name',
83+
'value': '$(context.pipelineRun.name)',
84+
})
85+
return params
86+
87+
def post_task_spec(self, task_spec: dict) -> dict:
88+
spec = task_spec.get('spec') or {}
89+
90+
pod_template = spec.setdefault('podTemplate', {})
91+
pod_template['imagePullSecrets'] = [{'name': 'ai-lifecycle'}]
92+
pod_template['automountServiceAccountToken'] = 'false'
93+
94+
pipeline_spec = spec.get('pipelineSpec') or {}
95+
params = pipeline_spec.get('params') or []
96+
params_map = {
97+
param['name']: param for param in params
98+
}
99+
params_map.update({
100+
k: {
101+
'name': k,
102+
'type': 'string',
103+
} for k in self._internal_params.keys()
104+
})
105+
params = list(params_map.values())
106+
spec = task_spec.setdefault('spec', {})
107+
spec.setdefault('pipelineSpec', {})['params'] = params
108+
task_spec['spec'] = spec
109+
return task_spec
110+
111+
112+
def PrintOp(name: str, msg: str = None):
113+
if msg is None:
114+
msg = name
115+
print_op = load_component_from_text(
116+
"""
117+
name: %s
118+
inputs:
119+
- {name: input_text, type: String, description: 'Represents an input parameter.'}
120+
outputs:
121+
- {name: output_value, type: String, description: 'Represents an output paramter.'}
122+
implementation:
123+
container:
124+
image: alpine:3.6
125+
command:
126+
- sh
127+
- -c
128+
- |
129+
set -e
130+
echo $0 > $1
131+
- {inputValue: input_text}
132+
- {outputPath: output_value}
133+
""" % name
134+
)
135+
return print_op(msg)
136+
137+
138+
@dsl.pipeline("test pipeline")
139+
def test_pipeline():
140+
with ExitHandler() as it:
141+
# PrintOp("print-err-status", it.status)
142+
cel = load_component_from_text(r"""
143+
name: cel
144+
inputs:
145+
- {name: cel-input}
146+
outputs:
147+
- {name: cel-output}
148+
implementation:
149+
container:
150+
image: veryunique/image:latest
151+
command: [cel]
152+
args:
153+
- --apiVersion
154+
- custom.tekton.dev/v1alpha1
155+
- --kind
156+
- Cel
157+
- --name
158+
- cel_123
159+
- --status
160+
- {inputValue: cel-input}
161+
- --taskSpec
162+
- '{}'
163+
""")(it.status)
164+
cel.add_pod_annotation("valid_container", "false")
165+
166+
PrintOp("print", "some-message")
167+
168+
169+
if __name__ == '__main__':
170+
TektonCompiler().compile(test_pipeline, __file__.replace('.py', '.yaml'))
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Copyright 2021 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: tekton.dev/v1beta1
16+
kind: PipelineRun
17+
metadata:
18+
name: test-pipeline
19+
annotations:
20+
tekton.dev/output_artifacts: '{"print": [{"key": "artifacts/$PIPELINERUN/print/output_value.tgz",
21+
"name": "print-output_value", "path": "/tmp/outputs/output_value/data"}]}'
22+
tekton.dev/input_artifacts: '{}'
23+
tekton.dev/artifact_bucket: mlpipeline
24+
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
25+
tekton.dev/artifact_endpoint_scheme: http://
26+
tekton.dev/artifact_items: '{"print": [["output_value", "$(results.output-value.path)"]]}'
27+
sidecar.istio.io/inject: "false"
28+
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
29+
pipelines.kubeflow.org/pipeline_spec: '{"name": "test pipeline"}'
30+
spec:
31+
pipelineSpec:
32+
tasks:
33+
- name: print
34+
taskSpec:
35+
steps:
36+
- name: main
37+
command:
38+
- sh
39+
- -c
40+
- |
41+
set -e
42+
echo $0 > $1
43+
- some-message
44+
- $(results.output-value.path)
45+
image: alpine:3.6
46+
results:
47+
- name: output-value
48+
type: string
49+
description: /tmp/outputs/output_value/data
50+
metadata:
51+
labels:
52+
pipelines.kubeflow.org/pipelinename: ''
53+
pipelines.kubeflow.org/generation: ''
54+
pipelines.kubeflow.org/cache_enabled: "true"
55+
annotations:
56+
pipelines.kubeflow.org/component_spec_digest: '{"name": "print", "outputs":
57+
[{"description": "Represents an output paramter.", "name": "output_value",
58+
"type": "String"}], "version": "print@sha256=c6e88bb19253b3bedeb9912855f4e324700cd80285e6b625b9ebcffb58677766"}'
59+
tekton.dev/template: ''
60+
timeout: 525600m
61+
finally:
62+
- name: test-pipeline-addon-group-1
63+
params:
64+
- name: pipelinerun_name
65+
value: $(context.pipelineRun.name)
66+
taskSpec:
67+
apiVersion: custom.tekton.dev/v1alpha1
68+
kind: Exception
69+
spec:
70+
pipelineSpec:
71+
params:
72+
- name: status
73+
type: string
74+
- name: status_message
75+
type: string
76+
tasks:
77+
- name: cel
78+
params:
79+
- name: status
80+
value: $(params.status)
81+
taskSpec:
82+
apiVersion: custom.tekton.dev/v1alpha1
83+
kind: Cel
84+
spec: {}
85+
metadata:
86+
labels:
87+
pipelines.kubeflow.org/pipelinename: ''
88+
pipelines.kubeflow.org/generation: ''
89+
pipelines.kubeflow.org/cache_enabled: "true"
90+
annotations:
91+
valid_container: "false"
92+
pipelines.kubeflow.org/component_spec_digest: '{"name": "cel",
93+
"outputs": [{"name": "cel-output"}], "version": "cel@sha256=0178923386cf89d2b7f7bae1037d14efe9e2f5898d55c27420c97a81795e5a59"}'
94+
tekton.dev/template: ''
95+
timeout: 525600m
96+
podTemplate:
97+
imagePullSecrets:
98+
- name: ai-lifecycle
99+
automountServiceAccountToken: "false"
100+
metadata:
101+
labels:
102+
pipelines.kubeflow.org/pipelinename: ''
103+
pipelines.kubeflow.org/generation: ''
104+
pipelines.kubeflow.org/cache_enabled: "false"
105+
annotations:
106+
ws-pipelines.ibm.com/pipeline-cache-enabled: "false"
107+
timeout: 525600m

0 commit comments

Comments
 (0)