Skip to content

Commit fb32672

Browse files
authored
feat(sdk): add pipeline_conf.timeout sdk support (kubeflow#1108)
* add pipeline_conf.timeout sdk support * update unit test script to take kfp config object
1 parent 05baded commit fb32672

File tree

4 files changed

+152
-5
lines changed

4 files changed

+152
-5
lines changed

sdk/python/kfp_tekton/compiler/compiler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1610,6 +1610,11 @@ def _create_workflow(self,
16101610
if pipeline_conf and pipeline_conf.data_passing_method is not None:
16111611
workflow = fix_big_data_passing_using_volume(workflow, pipeline_conf)
16121612

1613+
if pipeline_conf and pipeline_conf.timeout > 0:
1614+
workflow['spec'].setdefault('timeouts', {'pipeline': '0s', 'tasks': '0s'})
1615+
workflow['spec']['timeouts']['tasks'] = '%ds' % pipeline_conf.timeout
1616+
workflow['spec']['timeouts']['pipeline'] = '%ds' % (pipeline_conf.timeout + DEFAULT_FINALLY_SECONDS)
1617+
16131618
workflow.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/pipeline_spec'] = \
16141619
json.dumps(pipeline_meta.to_dict(), sort_keys=True)
16151620

sdk/python/tests/compiler/compiler_tests.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,16 @@ def test_timeout_workflow(self):
634634
from .testdata.timeout import timeout_sample_pipeline
635635
self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout.yaml', skip_noninlined=True)
636636

637+
def test_timeout_config_workflow(self):
638+
"""
639+
Test compiling a step level timeout config workflow.
640+
"""
641+
from .testdata.timeout_config import timeout_sample_pipeline
642+
from kfp import dsl
643+
pipeline_conf = dsl.PipelineConf()
644+
pipeline_conf.set_timeout(100)
645+
self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout_config.yaml', pipeline_conf=pipeline_conf, skip_noninlined=True)
646+
637647
def test_display_name_workflow(self):
638648
"""
639649
Test compiling a step level timeout workflow.
@@ -847,7 +857,8 @@ def _test_pipeline_workflow_inlined_spec(self,
847857
pipeline_function,
848858
pipeline_yaml,
849859
normalize_compiler_output_function=None,
850-
tekton_pipeline_conf=TektonPipelineConf()):
860+
tekton_pipeline_conf=TektonPipelineConf(),
861+
pipeline_conf=None):
851862
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
852863
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml)
853864
temp_dir = tempfile.mkdtemp()
@@ -856,7 +867,8 @@ def _test_pipeline_workflow_inlined_spec(self,
856867
try:
857868
compiler.TektonCompiler().compile(pipeline_function,
858869
compiled_yaml_file,
859-
tekton_pipeline_conf=tekton_pipeline_conf)
870+
tekton_pipeline_conf=tekton_pipeline_conf,
871+
pipeline_conf=pipeline_conf)
860872
with open(compiled_yaml_file, 'r') as f:
861873
f = normalize_compiler_output_function(
862874
f.read()) if normalize_compiler_output_function else f
@@ -870,12 +882,14 @@ def _test_pipeline_workflow(self,
870882
pipeline_yaml,
871883
normalize_compiler_output_function=None,
872884
tekton_pipeline_conf=TektonPipelineConf(),
873-
skip_noninlined=False):
885+
skip_noninlined=False,
886+
pipeline_conf=None):
874887
self._test_pipeline_workflow_inlined_spec(
875888
pipeline_function=pipeline_function,
876889
pipeline_yaml=pipeline_yaml,
877890
normalize_compiler_output_function=normalize_compiler_output_function,
878-
tekton_pipeline_conf=tekton_pipeline_conf)
891+
tekton_pipeline_conf=tekton_pipeline_conf,
892+
pipeline_conf=pipeline_conf)
879893
if not skip_noninlined:
880894
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
881895
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml.replace(".yaml", "") + "_noninlined.yaml")
@@ -885,7 +899,8 @@ def _test_pipeline_workflow(self,
885899
try:
886900
compiler.TektonCompiler().compile(pipeline_function,
887901
compiled_yaml_file,
888-
tekton_pipeline_conf=tekton_pipeline_conf)
902+
tekton_pipeline_conf=tekton_pipeline_conf,
903+
pipeline_conf=pipeline_conf)
889904
with open(compiled_yaml_file, 'r') as f:
890905
f = normalize_compiler_output_function(
891906
f.read()) if normalize_compiler_output_function else f
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright 2020 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from kfp import dsl, components
17+
18+
19+
random_failure_1Op = components.load_component_from_text("""
20+
name: random-failure
21+
description: random failure
22+
inputs:
23+
- {name: exitcodes, type: String}
24+
implementation:
25+
container:
26+
image: python:alpine3.6
27+
command:
28+
- python
29+
- -c
30+
args:
31+
- |
32+
import random; import sys; exit_code = random.choice([$0]); print(exit_code); \
33+
import time; time.sleep(30); sys.exit(exit_code)
34+
- {inputValue: exitcodes}
35+
""")
36+
37+
38+
@dsl.pipeline(
39+
name='pipeline-includes-two-steps-which-fail-randomly',
40+
description='shows how to use ContainerOp set_timeout().'
41+
)
42+
def timeout_sample_pipeline():
43+
op1 = random_failure_1Op('0,1,2,3').set_timeout(20)
44+
op2 = random_failure_1Op('0,1')
45+
46+
47+
if __name__ == '__main__':
48+
from kfp_tekton.compiler import TektonCompiler
49+
pipeline_conf = dsl.PipelineConf()
50+
pipeline_conf.set_timeout(100)
51+
TektonCompiler().compile(timeout_sample_pipeline, __file__.replace('.py', '.yaml'), pipeline_conf=pipeline_conf)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Copyright 2021 kubeflow.org
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
apiVersion: tekton.dev/v1beta1
16+
kind: PipelineRun
17+
metadata:
18+
name: pipeline-includes-two-steps-which-fail-randomly
19+
annotations:
20+
tekton.dev/output_artifacts: '{}'
21+
tekton.dev/input_artifacts: '{}'
22+
tekton.dev/artifact_bucket: mlpipeline
23+
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
24+
tekton.dev/artifact_endpoint_scheme: http://
25+
tekton.dev/artifact_items: '{"random-failure": [], "random-failure-2": []}'
26+
sidecar.istio.io/inject: "false"
27+
tekton.dev/template: ''
28+
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
29+
pipelines.kubeflow.org/pipeline_spec: '{"description": "shows how to use ContainerOp
30+
set_timeout().", "name": "pipeline-includes-two-steps-which-fail-randomly"}'
31+
labels:
32+
pipelines.kubeflow.org/pipelinename: ''
33+
pipelines.kubeflow.org/generation: ''
34+
spec:
35+
pipelineSpec:
36+
tasks:
37+
- name: random-failure
38+
taskSpec:
39+
steps:
40+
- name: main
41+
args:
42+
- |
43+
import random; import sys; exit_code = random.choice([$0]); print(exit_code); import time; time.sleep(30); sys.exit(exit_code)
44+
- 0,1,2,3
45+
command:
46+
- python
47+
- -c
48+
image: python:alpine3.6
49+
metadata:
50+
labels:
51+
pipelines.kubeflow.org/cache_enabled: "true"
52+
annotations:
53+
pipelines.kubeflow.org/component_spec_digest: '{"name": "random-failure",
54+
"outputs": [], "version": "random-failure@sha256=7a3950e9d0afce355b325b09e2cd5710feb99e6233a330af889d80515c4aaac2"}'
55+
timeout: 20s
56+
- name: random-failure-2
57+
taskSpec:
58+
steps:
59+
- name: main
60+
args:
61+
- |
62+
import random; import sys; exit_code = random.choice([$0]); print(exit_code); import time; time.sleep(30); sys.exit(exit_code)
63+
- 0,1
64+
command:
65+
- python
66+
- -c
67+
image: python:alpine3.6
68+
metadata:
69+
labels:
70+
pipelines.kubeflow.org/cache_enabled: "true"
71+
annotations:
72+
pipelines.kubeflow.org/component_spec_digest: '{"name": "random-failure",
73+
"outputs": [], "version": "random-failure@sha256=7a3950e9d0afce355b325b09e2cd5710feb99e6233a330af889d80515c4aaac2"}'
74+
timeouts:
75+
pipeline: 400s
76+
tasks: 100s

0 commit comments

Comments
 (0)