Skip to content

Commit fe4734e

Browse files
authored
Merge pull request opendatahub-io#9 from kubeflow/master
[pull] master from kubeflow:master
2 parents 429a965 + 00339aa commit fe4734e

17 files changed

+285
-146
lines changed

sdk/python/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ the Tekton YAML instead of Argo YAML. Since the KFP SDK was not designed and imp
108108
_monkey-patching_ was used to replace non-class methods and functions at runtime.
109109

110110
In order for the _monkey patch_ to work properly, the `kfp-tekton` compiler source code has to be aligned with a
111-
specific version of the `kfp` SDK compiler. As of now the `kfp-tekton` SDK version is `1.2.2` which is aligned with KFP
112-
SDK version [`1.8.12`](https://pypi.org/project/kfp/1.8.12/).
111+
specific version of the `kfp` SDK compiler. As of now the `kfp-tekton` SDK version is `1.2.3` which is aligned with KFP
112+
SDK version [`1.8.13`](https://pypi.org/project/kfp/1.8.13/).
113113

114114

115115
## Adding New Code

sdk/python/kfp_tekton/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
__version__ = '1.2.2'
15+
__version__ = '1.2.3'
1616

1717
from ._client import TektonClient # noqa F401
1818
from .k8s_client_helper import env_from_secret # noqa F401

sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,22 @@ def big_data_passing_pipeline(name: str, template: dict, inputs_tasks: set(),
448448
"name": task.get('name'),
449449
"workspace": pipeline_name
450450
})
451+
artifact_output_list = task.get('taskSpec', {}).get('metadata', {}).get('annotations', {}).get(
452+
ARTIFACT_OUTPUTLIST_ANNOTATION_KEY, '')
453+
if artifact_output_list:
454+
tmp_list = set()
455+
for output in json.loads(artifact_output_list):
456+
tmp_list.add(sanitize_k8s_name(output))
457+
for task_output in task.get('taskSpec', {}).get('results', []):
458+
if task_output.get('name') in tmp_list:
459+
if not task.setdefault('workspaces', []):
460+
task['workspaces'].append({
461+
"name": task.get('name'),
462+
"workspace": pipeline_name
463+
})
464+
pipeline_workspaces.add(pipeline_name)
465+
break
466+
451467
if pipeline_name in pipeline_workspaces:
452468
# Add workspaces to pipeline
453469
if not pipeline_spec.setdefault('workspaces', []):
@@ -511,7 +527,7 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
511527
# For child nodes to know the taskrun name, it has to pass to results via /tekton/results emptydir
512528
if not appended_taskrun_name:
513529
copy_taskrun_name_step = _get_base_step('output-taskrun-name')
514-
copy_taskrun_name_step['script'] += 'echo -n "%s" > $(results.taskrun-name.path)\n' % ("$(context.taskRun.name)")
530+
copy_taskrun_name_step['command'].append('echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"')
515531
task['taskSpec']['results'].append({"name": "taskrun-name", "type": "string"})
516532
task['taskSpec']['steps'].append(copy_taskrun_name_step)
517533
_append_original_pr_name_env(task)
@@ -608,7 +624,7 @@ def append_taskrun_params(task_name_append: str):
608624
if task_spec.get('results', []):
609625
copy_results_artifact_step = _get_base_step('copy-results-artifacts')
610626
copy_results_artifact_step['onError'] = 'continue' # supported by v0.27+ of tekton.
611-
copy_results_artifact_step['script'] += 'TOTAL_SIZE=0\n'
627+
script = "set -exo pipefail\nTOTAL_SIZE=0\n"
612628
for result in task_spec['results']:
613629
if task['name'] in artifact_items:
614630
artifact_i = artifact_items[task['name']]
@@ -618,14 +634,17 @@ def append_taskrun_params(task_name_append: str):
618634
dst = '$(results.%s.path)' % sanitize_k8s_name(result['name'])
619635
if artifact_name == result['name'] and src != dst:
620636
add_copy_results_artifacts_step = True
621-
copy_results_artifact_step['script'] += (
637+
script += (
622638
'ARTIFACT_SIZE=`wc -c %s | awk \'{print $1}\'`\n' % src +
623639
'TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)\n' +
624640
'touch ' + dst + '\n' + # create an empty file by default.
625641
'if [[ $TOTAL_SIZE -lt 3072 ]]; then\n' +
626-
' cp ' + src + ' ' + dst + '\n' +
642+
' if ! awk "/[^[:print:]]/{f=1} END{exit !f}" %s; then\n' % src +
643+
' cp ' + src + ' ' + dst + '\n' +
644+
' fi\n'
627645
'fi\n'
628646
)
647+
copy_results_artifact_step['command'].append(script)
629648
_append_original_pr_name_env_to_step(copy_results_artifact_step)
630649
if add_copy_results_artifacts_step:
631650
task['taskSpec']['steps'].append(copy_results_artifact_step)
@@ -649,17 +668,19 @@ def input_artifacts_tasks_pr_params(template: dict, artifact: dict) -> dict:
649668
task_name = template.get('name')
650669
task_spec = template.get('taskSpec', {})
651670
task_params = task_spec.get('params', [])
671+
script = "set -exo pipefail\n"
652672
for task_param in task_params:
653673
# For pipeline parameter input artifacts, it will never come from another task because pipeline
654674
# params are global parameters. Thus, task_name will always be the executing task name.
655675
workspaces_parameter = '$(workspaces.%s.path)/%s/%s/%s' % (
656676
task_name, BIG_DATA_MIDPATH, "$(context.taskRun.name)", task_param.get('name'))
657677
if 'raw' in artifact:
658-
copy_inputs_step['script'] += 'mkdir -p %s\n' % pathlib.Path(workspaces_parameter).parent
659-
copy_inputs_step['script'] += 'echo -n "%s" > %s\n' % (
678+
script += 'mkdir -p %s\n' % pathlib.Path(workspaces_parameter).parent
679+
script += 'echo -n "%s" > %s\n' % (
660680
artifact['raw']['data'], workspaces_parameter)
661681
_append_original_pr_name_env(template)
662682

683+
copy_inputs_step['command'].append(script)
663684
template['taskSpec']['steps'] = _prepend_steps(
664685
[copy_inputs_step], template['taskSpec']['steps'])
665686

@@ -674,8 +695,8 @@ def input_artifacts_tasks(template: dict, artifact: dict) -> dict:
674695
mounted_param_paths = []
675696
copy_inputs_step = _get_base_step('copy-inputs')
676697
if 'raw' in artifact:
677-
copy_inputs_step['script'] += 'echo -n "%s" > %s\n' % (
678-
artifact['raw']['data'], artifact['path'])
698+
copy_inputs_step['command'].append('set -exo pipefail\necho -n "%s" > %s\n' % (
699+
artifact['raw']['data'], artifact['path']))
679700
mount_path = artifact['path'].rsplit("/", 1)[0]
680701
if mount_path not in mounted_param_paths:
681702
_add_mount_path(artifact['name'], artifact['path'], mount_path,

sdk/python/kfp_tekton/compiler/_op_to_template.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def _get_base_step(name: str):
5353
return {
5454
'image': TEKTON_BASH_STEP_IMAGE,
5555
'name': name,
56-
'script': '#!/bin/sh\nset -exo pipefail\n'
56+
'command': ['sh', '-ec']
5757
}
5858

5959

@@ -272,6 +272,7 @@ def _process_parameters(processed_op: BaseOp,
272272
if outputs_dict.get('parameters'):
273273
template['spec']['results'] = []
274274
copy_results_step = _get_base_step('copy-results')
275+
script = "set -exo pipefail\n"
275276
for name, path in processed_op.file_outputs.items():
276277
template['spec']['results'].append({
277278
'name': name,
@@ -301,15 +302,15 @@ def _process_parameters(processed_op: BaseOp,
301302
need_copy_step = False
302303
# If file output path cannot be found/replaced, use emptyDir to copy it to the tekton/results path
303304
if need_copy_step:
304-
copy_results_step['script'] = copy_results_step['script'] + 'cp ' + path + ' $(results.%s.path);' \
305-
% sanitize_k8s_name(name) + '\n'
305+
script = script + 'cp ' + path + ' $(results.%s.path);\n' % sanitize_k8s_name(name)
306306
mount_path = path.rsplit("/", 1)[0]
307307
if mount_path not in mounted_param_paths:
308308
_add_mount_path(name, path, mount_path, volume_mount_step_template, volume_template, mounted_param_paths)
309309
# Record what artifacts are moved to result parameters.
310310
parameter_name = sanitize_k8s_name(processed_op.name + '-' + name, allow_capital_underscore=True, max_length=float('Inf'))
311311
replaced_param_list.append(parameter_name)
312312
artifact_to_result_mapping[parameter_name] = name
313+
copy_results_step['command'].append(script)
313314
return copy_results_step
314315
else:
315316
return {}

sdk/python/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ idna==2.10
6767
# via requests
6868
jsonschema==3.2.0
6969
# via kfp
70-
kfp==1.8.12
70+
kfp==1.8.13
7171
# via -r sdk/python/requirements.in
7272
kfp-pipeline-spec==0.1.16
7373
# via kfp

sdk/python/setup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
#
2121
# To create a distribution for PyPi run:
2222
#
23-
# $ export KFP_TEKTON_VERSION=1.2.2-rc1
23+
# $ export KFP_TEKTON_VERSION=1.2.3-rc1
2424
# $ python3 setup.py sdist
2525
# $ twine check dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz
2626
# $ twine upload --repository pypi dist/kfp-tekton-${KFP_TEKTON_VERSION/-rc/rc}.tar.gz
2727
#
2828
# ... or:
2929
#
30-
# $ make distribution KFP_TEKTON_VERSION=1.2.2-rc1
30+
# $ make distribution KFP_TEKTON_VERSION=1.2.3-rc1
3131
#
3232
# =============================================================================
3333

@@ -54,7 +54,7 @@
5454
# NOTICE, after any updates to the following, ./requirements.in should be updated
5555
# accordingly.
5656
REQUIRES = [
57-
"kfp>=1.8.10,<1.8.13",
57+
"kfp>=1.8.10,<1.8.14",
5858
]
5959

6060
TESTS_REQUIRE = [

sdk/python/tests/compiler/testdata/artifact_outputs.yaml

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,25 @@ spec:
6464
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
6565
- image: busybox
6666
name: output-taskrun-name
67-
script: |
68-
#!/bin/sh
69-
set -exo pipefail
70-
echo -n "$(context.taskRun.name)" > $(results.taskrun-name.path)
67+
command:
68+
- sh
69+
- -ec
70+
- echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"
7171
- image: busybox
7272
name: copy-results-artifacts
73-
script: |
74-
#!/bin/sh
73+
command:
74+
- sh
75+
- -ec
76+
- |
7577
set -exo pipefail
7678
TOTAL_SIZE=0
7779
ARTIFACT_SIZE=`wc -c $(workspaces.gcs-download.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/data | awk '{print $1}'`
7880
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
7981
touch $(results.data.path)
8082
if [[ $TOTAL_SIZE -lt 3072 ]]; then
81-
cp $(workspaces.gcs-download.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/data $(results.data.path)
83+
if ! awk "/[^[:print:]]/{f=1} END{exit !f}" $(workspaces.gcs-download.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/data; then
84+
cp $(workspaces.gcs-download.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/data $(results.data.path)
85+
fi
8286
fi
8387
onError: continue
8488
env:
@@ -111,4 +115,19 @@ spec:
111115
workspaces:
112116
- name: gcs-download
113117
timeout: 525600m
118+
workspaces:
119+
- name: gcs-download
120+
workspace: artifact-out-pipeline
121+
workspaces:
122+
- name: artifact-out-pipeline
114123
timeout: 525600m
124+
workspaces:
125+
- name: artifact-out-pipeline
126+
volumeClaimTemplate:
127+
spec:
128+
storageClassName: kfp-csi-s3
129+
accessModes:
130+
- ReadWriteMany
131+
resources:
132+
requests:
133+
storage: 2Gi

sdk/python/tests/compiler/testdata/big_data_passing.yaml

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,25 @@ spec:
9797
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
9898
- image: busybox
9999
name: output-taskrun-name
100-
script: |
101-
#!/bin/sh
102-
set -exo pipefail
103-
echo -n "$(context.taskRun.name)" > $(results.taskrun-name.path)
100+
command:
101+
- sh
102+
- -ec
103+
- echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"
104104
- image: busybox
105105
name: copy-results-artifacts
106-
script: |
107-
#!/bin/sh
106+
command:
107+
- sh
108+
- -ec
109+
- |
108110
set -exo pipefail
109111
TOTAL_SIZE=0
110112
ARTIFACT_SIZE=`wc -c $(workspaces.repeat-line.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/output_text | awk '{print $1}'`
111113
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
112114
touch $(results.output-text.path)
113115
if [[ $TOTAL_SIZE -lt 3072 ]]; then
114-
cp $(workspaces.repeat-line.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/output_text $(results.output-text.path)
116+
if ! awk "/[^[:print:]]/{f=1} END{exit !f}" $(workspaces.repeat-line.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/output_text; then
117+
cp $(workspaces.repeat-line.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/output_text $(results.output-text.path)
118+
fi
115119
fi
116120
onError: continue
117121
env:
@@ -146,8 +150,10 @@ spec:
146150
steps:
147151
- image: busybox
148152
name: copy-inputs
149-
script: |
150-
#!/bin/sh
153+
command:
154+
- sh
155+
- -ec
156+
- |
151157
set -exo pipefail
152158
echo -n "one
153159
two
@@ -212,27 +218,33 @@ spec:
212218
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
213219
- image: busybox
214220
name: output-taskrun-name
215-
script: |
216-
#!/bin/sh
217-
set -exo pipefail
218-
echo -n "$(context.taskRun.name)" > $(results.taskrun-name.path)
221+
command:
222+
- sh
223+
- -ec
224+
- echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"
219225
- image: busybox
220226
name: copy-results-artifacts
221-
script: |
222-
#!/bin/sh
227+
command:
228+
- sh
229+
- -ec
230+
- |
223231
set -exo pipefail
224232
TOTAL_SIZE=0
225233
ARTIFACT_SIZE=`wc -c $(workspaces.split-text-lines.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/odd_lines | awk '{print $1}'`
226234
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
227235
touch $(results.odd-lines.path)
228236
if [[ $TOTAL_SIZE -lt 3072 ]]; then
229-
cp $(workspaces.split-text-lines.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/odd_lines $(results.odd-lines.path)
237+
if ! awk "/[^[:print:]]/{f=1} END{exit !f}" $(workspaces.split-text-lines.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/odd_lines; then
238+
cp $(workspaces.split-text-lines.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/odd_lines $(results.odd-lines.path)
239+
fi
230240
fi
231241
ARTIFACT_SIZE=`wc -c $(workspaces.split-text-lines.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/even_lines | awk '{print $1}'`
232242
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
233243
touch $(results.even-lines.path)
234244
if [[ $TOTAL_SIZE -lt 3072 ]]; then
235-
cp $(workspaces.split-text-lines.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/even_lines $(results.even-lines.path)
245+
if ! awk "/[^[:print:]]/{f=1} END{exit !f}" $(workspaces.split-text-lines.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/even_lines; then
246+
cp $(workspaces.split-text-lines.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/even_lines $(results.even-lines.path)
247+
fi
236248
fi
237249
onError: continue
238250
env:
@@ -432,21 +444,25 @@ spec:
432444
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
433445
- image: busybox
434446
name: output-taskrun-name
435-
script: |
436-
#!/bin/sh
437-
set -exo pipefail
438-
echo -n "$(context.taskRun.name)" > $(results.taskrun-name.path)
447+
command:
448+
- sh
449+
- -ec
450+
- echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"
439451
- image: busybox
440452
name: copy-results-artifacts
441-
script: |
442-
#!/bin/sh
453+
command:
454+
- sh
455+
- -ec
456+
- |
443457
set -exo pipefail
444458
TOTAL_SIZE=0
445459
ARTIFACT_SIZE=`wc -c $(workspaces.write-numbers.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/numbers | awk '{print $1}'`
446460
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
447461
touch $(results.numbers.path)
448462
if [[ $TOTAL_SIZE -lt 3072 ]]; then
449-
cp $(workspaces.write-numbers.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/numbers $(results.numbers.path)
463+
if ! awk "/[^[:print:]]/{f=1} END{exit !f}" $(workspaces.write-numbers.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/numbers; then
464+
cp $(workspaces.write-numbers.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/numbers $(results.numbers.path)
465+
fi
450466
fi
451467
onError: continue
452468
env:
@@ -600,21 +616,25 @@ spec:
600616
fieldPath: metadata.labels['custom.tekton.dev/originalPipelineRun']
601617
- image: busybox
602618
name: output-taskrun-name
603-
script: |
604-
#!/bin/sh
605-
set -exo pipefail
606-
echo -n "$(context.taskRun.name)" > $(results.taskrun-name.path)
619+
command:
620+
- sh
621+
- -ec
622+
- echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"
607623
- image: busybox
608624
name: copy-results-artifacts
609-
script: |
610-
#!/bin/sh
625+
command:
626+
- sh
627+
- -ec
628+
- |
611629
set -exo pipefail
612630
TOTAL_SIZE=0
613631
ARTIFACT_SIZE=`wc -c $(workspaces.sum-numbers.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/Output | awk '{print $1}'`
614632
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
615633
touch $(results.output.path)
616634
if [[ $TOTAL_SIZE -lt 3072 ]]; then
617-
cp $(workspaces.sum-numbers.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/Output $(results.output.path)
635+
if ! awk "/[^[:print:]]/{f=1} END{exit !f}" $(workspaces.sum-numbers.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/Output; then
636+
cp $(workspaces.sum-numbers.path)/artifacts/$ORIG_PR_NAME/$(context.taskRun.name)/Output $(results.output.path)
637+
fi
618638
fi
619639
onError: continue
620640
env:

0 commit comments

Comments
 (0)