Skip to content

Commit 25d1166

Browse files
authored
feat(backend): add new annotation flags to enable artifact tracking at pipeline and task level (kubeflow#1065)
* add new annotation flags to enable artifact tracking at pipeline and task level * fix artifact metrics bug for new tekton version * fix path bug * fix path bug * fix path bug * fix path * fix feature flag * add feature doc
1 parent f1ed822 commit 25d1166

File tree

4 files changed

+40
-4
lines changed

4 files changed

+40
-4
lines changed

backend/src/apiserver/common/const.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ const (
8080
ArtifactBucketAnnotation string = "tekton.dev/artifact_bucket"
8181
ArtifactEndpointAnnotation string = "tekton.dev/artifact_endpoint"
8282
ArtifactEndpointSchemeAnnotation string = "tekton.dev/artifact_endpoint_scheme"
83+
TrackArtifactAnnotation string = "tekton.dev/track_artifact"
84+
TrackStepArtifactAnnotation string = "tekton.dev/track_step_artifact"
8385
)
8486

8587
// For backward compatibility. Remove after 0.3 release

backend/src/apiserver/template/tekton_template.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,17 @@ func (t *Tekton) injectArchivalStep(workflow util.Workflow, artifactItemsJSON ma
262262
injectDefaultScript := common.IsInjectDefaultScript()
263263
copyStepTemplate := common.GetCopyStepTemplate()
264264

265+
artifactAnnotation, exists := workflow.ObjectMeta.Annotations[common.TrackArtifactAnnotation]
266+
if exists && strings.ToLower(artifactAnnotation) == "true" {
267+
trackArtifacts = true
268+
}
265269
if task.TaskSpec != nil && task.TaskSpec.Steps != nil {
266-
if (hasArtifacts && len(artifacts) > 0 && trackArtifacts) || archiveLogs || (hasArtifacts && len(artifacts) > 0 && stripEOF) {
270+
injectStepArtifacts := false
271+
stepArtifactAnnotation, exists := task.TaskSpec.Metadata.Annotations[common.TrackStepArtifactAnnotation]
272+
if trackArtifacts || (exists && strings.ToLower(stepArtifactAnnotation) == "true") {
273+
injectStepArtifacts = true
274+
}
275+
if (hasArtifacts && len(artifacts) > 0 && injectStepArtifacts) || archiveLogs || (hasArtifacts && len(artifacts) > 0 && stripEOF) {
267276
artifactScript := common.GetArtifactScript()
268277
if archiveLogs {
269278
// Logging volumes
@@ -311,7 +320,7 @@ func (t *Tekton) injectArchivalStep(workflow util.Workflow, artifactItemsJSON ma
311320

312321
// Process the artifacts into minimum sh commands if running with minimum linux kernel
313322
if injectDefaultScript {
314-
artifactScript = t.injectDefaultScript(workflow, artifactScript, artifacts, hasArtifacts, archiveLogs, trackArtifacts, stripEOF, moveStep)
323+
artifactScript = t.injectDefaultScript(workflow, artifactScript, artifacts, hasArtifacts, archiveLogs, injectStepArtifacts, stripEOF, moveStep)
315324
}
316325

317326
// Define post-processing step
@@ -357,9 +366,14 @@ func (t *Tekton) injectDefaultScript(workflow util.Workflow, artifactScript stri
357366
// Upload Artifacts if the artifact is enabled and the annoations are present
358367
if hasArtifacts && len(artifacts) > 0 && trackArtifacts {
359368
for _, artifact := range artifacts {
369+
artifactBaseName := fmt.Sprintf("\"$(basename \"%s\")", artifact[1])
370+
artifactPathName := fmt.Sprintf("\"%s/%s", common.GetPath4InternalResults(), artifactBaseName)
371+
if artifact[0] == "mlpipeline-ui-metadata" || artifact[0] == "mlpipeline-metrics" {
372+
artifactPathName = fmt.Sprintf("\"%s\"", artifact[1])
373+
}
360374
if len(artifact) == 2 {
361-
artifactScript += fmt.Sprintf("push_artifact \"%s\" \"%s/\"$(basename \"%s\")\n",
362-
artifact[0], common.GetPath4InternalResults(), artifact[1])
375+
artifactScript += fmt.Sprintf("push_artifact \"%s\" %s\n",
376+
artifact[0], artifactPathName)
363377
} else {
364378
glog.Warningf("Artifact annotations are missing for run %v.", workflow.Name)
365379
}

samples/lightweight-component/calc_pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def calc_pipeline(
7272
#Passing a task output reference as operation arguments
7373
#For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
7474
divmod_task = divmod_op(add_task.output, b)
75+
divmod_task.add_pod_annotation("tekton.dev/track_step_artifact", "true")
7576

7677
#For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
7778
result_task = add_op(divmod_task.outputs['quotient'], c)

sdk/FEATURES.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ and test pipelines found in the KFP repository.
3131
- [Input Artifacts](#input-artifacts)
3232
- [Output Artifacts](#output-artifacts)
3333
- [Caching](#caching)
34+
- [Enforce Artifact Tracking](#enforce-artifact-tracking)
3435
- [Features with Limitations](#features-with-limitations)
3536
- [Variable Substitutions](#variable-substitutions)
3637

@@ -256,6 +257,24 @@ By default compiling a pipeline will add metadata annotations and labels so that
256257

257258
The specific annotations and labels that are added to the task spec metadata to enable caching are: `annotations={'tekton.dev/template': ""}` and `labels={'pipelines.kubeflow.org/cache_enabled': 'true', 'pipelines.kubeflow.org/pipelinename': '', 'pipelines.kubeflow.org/generation': ''}`.
258259

260+
### Enforce Artifact Tracking
261+
262+
If you pipelines require artifact tracking in order to run, enforce artfact tracking to be always on for your pipelines using one of the two ways.
263+
264+
Enable by adding a new pipeline annotation that enforce artifact tracking for the whole pipeline:
265+
```python
266+
import kfp_tekton
267+
from kfp_tekton.compiler import TektonCompiler
268+
pipeline_conf = kfp_tekton.compiler.pipeline_utils.TektonPipelineConf()
269+
pipeline_conf.add_pipeline_annotation("tekton.dev/track_artifact", 'true')
270+
TektonCompiler().compile(echo_pipeline, 'echo_pipeline.yaml', tekton_pipeline_conf=pipeline_conf)
271+
```
272+
273+
Enable by adding a new task annotation that enforce artifact tracking for a specific task:
274+
```python
275+
task.add_pod_annotation("tekton.dev/track_step_artifact", "true")
276+
```
277+
259278
## Features with Limitations
260279

261280
Below are the features that have certain limitation in the current Tekton release.

0 commit comments

Comments
 (0)