Skip to content

Commit 743f436

Browse files
authored
Merge pull request opendatahub-io#10 from kubeflow/master
[pull] master from kubeflow:master
2 parents fe4734e + 126021b commit 743f436

File tree

5 files changed

+341
-70
lines changed

5 files changed

+341
-70
lines changed

sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,3 +764,25 @@ def _append_original_pr_name_env(task_template):
764764
for step in task_template['taskSpec']['steps']:
765765
if step['name'] == 'main':
766766
_append_original_pr_name_env_to_step(step)
767+
768+
769+
def fix_big_data_passing_using_volume(workflow, pipeline_conf):
770+
if workflow['spec'].get('workspaces') and pipeline_conf.data_passing_method._volume is not None:
771+
volume_dict = pipeline_conf.data_passing_method._volume.to_dict()
772+
volume_dict_pvc = volume_dict.get('persistent_volume_claim')
773+
if volume_dict_pvc:
774+
for workspace in workflow['spec']['workspaces']:
775+
if workspace['name'] == workflow['metadata']['name']:
776+
workspace.pop('volumeClaimTemplate')
777+
temp_vc = {}
778+
for key, value in volume_dict_pvc.items():
779+
if value != None:
780+
if key == 'claim_name':
781+
temp_vc['claimName'] = value
782+
if key == 'read_only':
783+
temp_vc['readOnly'] = value
784+
workspace['persistentVolumeClaim'] = temp_vc
785+
if pipeline_conf.data_passing_method._path_prefix != None:
786+
workspace['subPath'] = pipeline_conf.data_passing_method._path_prefix
787+
break
788+
return workflow

sdk/python/kfp_tekton/compiler/compiler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from kfp.dsl._metadata import _extract_pipeline_metadata
4040
# KFP-Tekton imports
4141
from kfp_tekton.compiler import __tekton_api_version__ as tekton_api_version
42-
from kfp_tekton.compiler._data_passing_rewriter import fix_big_data_passing, BIG_DATA_PATH_FORMAT
42+
from kfp_tekton.compiler._data_passing_rewriter import fix_big_data_passing, fix_big_data_passing_using_volume, BIG_DATA_PATH_FORMAT
4343
from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name, sanitize_k8s_object
4444
from kfp_tekton.compiler._op_to_template import _op_to_template
4545
from kfp_tekton.compiler._tekton_handler import _handle_tekton_pipeline_variables, _handle_tekton_custom_task, _process_argo_vars
@@ -1519,6 +1519,9 @@ def _create_workflow(self,
15191519

15201520
workflow = fix_big_data_passing(workflow, self.loops_pipeline, '-'.join(self._group_names[:-1] + [""]))
15211521

1522+
if pipeline_conf and pipeline_conf.data_passing_method is not None:
1523+
workflow = fix_big_data_passing_using_volume(workflow, pipeline_conf)
1524+
15221525
workflow.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/pipeline_spec'] = \
15231526
json.dumps(pipeline_meta.to_dict(), sort_keys=True)
15241527

sdk/python/tests/compiler/compiler_tests.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,13 @@ def test_big_data_workflow(self):
639639
from .testdata.big_data_passing import file_passing_pipelines
640640
self._test_pipeline_workflow(file_passing_pipelines, 'big_data_passing.yaml', skip_noninlined=True)
641641

642+
def test_big_data_using_volume_workflow(self):
643+
"""
644+
Test compiling a big data passing workflow.
645+
"""
646+
from .testdata.artifact_passing_using_volume import artifact_passing_pipeline
647+
self._test_pipeline_workflow(artifact_passing_pipeline, 'artifact_passing_using_volume.yaml', skip_noninlined=True)
648+
642649
def test_create_component_from_func_workflow(self):
643650
"""
644651
Test compiling a creating component from func workflow.

sdk/python/tests/compiler/testdata/artifact_passing_using_volume.py

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,73 @@
1616

1717
from pathlib import Path
1818

19-
import kfp
20-
from kfp.components import load_component_from_file
19+
import kfp as kfp
20+
from kfp.components import load_component_from_file, create_component_from_func
21+
from typing import NamedTuple
2122

2223
test_data_dir = Path(__file__).parent / 'test_data'
23-
producer_op = load_component_from_file(str(test_data_dir / 'produce_2.component.yaml'))
24-
processor_op = load_component_from_file(str(test_data_dir / 'process_2_2.component.yaml'))
25-
consumer_op = load_component_from_file(str(test_data_dir / 'consume_2.component.yaml'))
24+
producer_op = load_component_from_file(
25+
str(test_data_dir / 'produce_2.component.yaml'))
26+
processor_op = load_component_from_file(
27+
str(test_data_dir / 'process_2_2.component.yaml'))
28+
consumer_op = load_component_from_file(
29+
str(test_data_dir / 'consume_2.component.yaml'))
30+
31+
32+
def metadata_and_metrics() -> NamedTuple(
33+
"Outputs",
34+
[("mlpipeline_ui_metadata", "UI_metadata"), ("mlpipeline_metrics", "Metrics"
35+
)],
36+
):
37+
metadata = {
38+
"outputs": [{
39+
"storage": "inline",
40+
"source": "*this should be bold*",
41+
"type": "markdown"
42+
}]
43+
}
44+
metrics = {
45+
"metrics": [
46+
{
47+
"name": "train-accuracy",
48+
"numberValue": 0.9,
49+
},
50+
{
51+
"name": "test-accuracy",
52+
"numberValue": 0.7,
53+
},
54+
]
55+
}
56+
from collections import namedtuple
57+
import json
58+
59+
return namedtuple("output",
60+
["mlpipeline_ui_metadata", "mlpipeline_metrics"])(
61+
json.dumps(metadata), json.dumps(metrics))
2662

2763

2864
@kfp.dsl.pipeline()
2965
def artifact_passing_pipeline():
3066
producer_task = producer_op()
31-
processor_task = processor_op(producer_task.outputs['output_1'], producer_task.outputs['output_2'])
32-
consumer_task = consumer_op(processor_task.outputs['output_1'], processor_task.outputs['output_2'])
67+
processor_task = processor_op(producer_task.outputs['output_1'],
68+
producer_task.outputs['output_2'])
69+
consumer_task = consumer_op(processor_task.outputs['output_1'],
70+
processor_task.outputs['output_2'])
3371

72+
markdown_task = create_component_from_func(func=metadata_and_metrics)()
3473
# This line is only needed for compiling using dsl-compile to work
35-
kfp.dsl.get_pipeline_conf().data_passing_method = volume_based_data_passing_method
74+
kfp.dsl.get_pipeline_conf(
75+
).data_passing_method = volume_based_data_passing_method
3676

3777

3878
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource
3979
from kfp.dsl import data_passing_methods
4080

41-
4281
volume_based_data_passing_method = data_passing_methods.KubernetesVolume(
4382
volume=V1Volume(
4483
name='data',
4584
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
46-
claim_name='data-volume',
47-
),
85+
claim_name='data-volume',),
4886
),
4987
path_prefix='artifact_data/',
5088
)

0 commit comments

Comments
 (0)