Skip to content

Commit 10488d6

Browse files
juliev0agilgur5
andcommitted
fix: don't necessarily include all artifacts from templates in node outputs (argoproj#13066)
Signed-off-by: Julie Vogelman <[email protected]> Co-authored-by: Anton Gilgur <[email protected]> (cherry picked from commit 2ca4841)
1 parent c2204ae commit 10488d6

File tree

6 files changed

+159
-40
lines changed

6 files changed

+159
-40
lines changed

cmd/argoexec/commands/wait.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,17 @@ func waitContainer(ctx context.Context) error {
5757
}
5858

5959
// Saving output artifacts
60-
err = wfExecutor.SaveArtifacts(bgCtx)
60+
artifacts, err := wfExecutor.SaveArtifacts(bgCtx)
6161
if err != nil {
6262
wfExecutor.AddError(err)
6363
}
6464

6565
// Save log artifacts
6666
logArtifacts := wfExecutor.SaveLogs(bgCtx)
67+
artifacts = append(artifacts, logArtifacts...)
6768

6869
// Try to upsert TaskResult. If it fails, we will try to update the Pod's Annotations
69-
err = wfExecutor.ReportOutputs(bgCtx, logArtifacts)
70+
err = wfExecutor.ReportOutputs(bgCtx, artifacts)
7071
if err != nil {
7172
wfExecutor.AddError(err)
7273
}

test/e2e/artifacts_test.go

Lines changed: 98 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,52 @@ func (s *ArtifactsSuite) TestGlobalArtifactPassing() {
128128
}
129129

130130
type artifactState struct {
131-
key string
132-
bucketName string
131+
artifactLocation s3Location
132+
133133
deletedAtWFCompletion bool
134134
deletedAtWFDeletion bool
135135
}
136136

137+
type s3Location struct {
138+
bucketName string
139+
// specify one of these two:
140+
specifiedKey string // exact key is known
141+
derivedKey *artifactDerivedKey // exact key needs to be derived
142+
}
143+
144+
type artifactDerivedKey struct {
145+
templateName string
146+
artifactName string
147+
}
148+
149+
func (al *s3Location) getS3Key(wf *wfv1.Workflow) (string, error) {
150+
if al.specifiedKey == "" && al.derivedKey == nil {
151+
panic(fmt.Sprintf("invalid artifactLocation: %+v, must have specifiedKey or derivedKey set", al))
152+
}
153+
154+
if al.specifiedKey != "" {
155+
return al.specifiedKey, nil
156+
}
157+
158+
// get key by finding the node in the Workflow's NodeStatus and looking at its Artifacts
159+
160+
// get node name using template
161+
n := wf.Status.Nodes.Find(func(nodeStatus wfv1.NodeStatus) bool { return nodeStatus.TemplateName == al.derivedKey.templateName })
162+
if n == nil {
163+
return "", fmt.Errorf("no node with template name=%q found in workflow %+v", al.derivedKey.templateName, wf)
164+
}
165+
for _, a := range n.Outputs.Artifacts {
166+
if a.Name == al.derivedKey.artifactName {
167+
if a.S3 == nil {
168+
return "", fmt.Errorf("didn't find expected S3 field in artifact %q: %+v", al.derivedKey.artifactName, a)
169+
}
170+
return a.S3.Key, nil
171+
}
172+
}
173+
174+
return "", fmt.Errorf("artifact named %q not found", al.derivedKey.artifactName)
175+
}
176+
137177
func (s *ArtifactsSuite) TestStoppedWorkflow() {
138178

139179
for _, tt := range []struct {
@@ -257,79 +297,97 @@ func (s *ArtifactsSuite) TestArtifactGC() {
257297
for _, tt := range []struct {
258298
workflowFile string
259299
hasGC bool
300+
workflowShouldSucceed bool
260301
expectedArtifacts []artifactState
261302
expectedGCPodsOnWFCompletion int
262303
}{
263304
{
264305
workflowFile: "@testdata/artifactgc/artgc-multi-strategy-multi-anno.yaml",
265306
hasGC: true,
307+
workflowShouldSucceed: true,
266308
expectedGCPodsOnWFCompletion: 2,
267309
expectedArtifacts: []artifactState{
268-
artifactState{"first-on-completion-1", "my-bucket-2", true, false},
269-
artifactState{"first-on-completion-2", "my-bucket-3", true, false},
270-
artifactState{"first-no-deletion", "my-bucket-3", false, false},
271-
artifactState{"second-on-deletion", "my-bucket-3", false, true},
272-
artifactState{"second-on-completion", "my-bucket-2", true, false},
310+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "first-on-completion-1"}, true, false},
311+
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "first-on-completion-2"}, true, false},
312+
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "first-no-deletion"}, false, false},
313+
artifactState{s3Location{bucketName: "my-bucket-3", specifiedKey: "second-on-deletion"}, false, true},
314+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "second-on-completion"}, true, false},
273315
},
274316
},
275317
// entire Workflow based on a WorkflowTemplate
276318
{
277319
workflowFile: "@testdata/artifactgc/artgc-from-template.yaml",
278320
hasGC: true,
321+
workflowShouldSucceed: true,
279322
expectedGCPodsOnWFCompletion: 1,
280323
expectedArtifacts: []artifactState{
281-
artifactState{"on-completion", "my-bucket-2", true, false},
282-
artifactState{"on-deletion", "my-bucket-2", false, true},
324+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
325+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
283326
},
284327
},
285328
// entire Workflow based on a WorkflowTemplate
286329
{
287330
workflowFile: "@testdata/artifactgc/artgc-from-template-2.yaml",
288331
hasGC: true,
332+
workflowShouldSucceed: true,
289333
expectedGCPodsOnWFCompletion: 1,
290334
expectedArtifacts: []artifactState{
291-
artifactState{"on-completion", "my-bucket-2", true, false},
292-
artifactState{"on-deletion", "my-bucket-2", false, true},
335+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
336+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
293337
},
294338
},
295339
// Step in Workflow references a WorkflowTemplate's template
296340
{
297341
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl.yaml",
298342
hasGC: true,
343+
workflowShouldSucceed: true,
299344
expectedGCPodsOnWFCompletion: 1,
300345
expectedArtifacts: []artifactState{
301-
artifactState{"on-completion", "my-bucket-2", true, false},
302-
artifactState{"on-deletion", "my-bucket-2", false, true},
346+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
347+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
303348
},
304349
},
305350
// Step in Workflow references a WorkflowTemplate's template
306351
{
307352
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-2.yaml",
308353
hasGC: true,
354+
workflowShouldSucceed: true,
309355
expectedGCPodsOnWFCompletion: 1,
310356
expectedArtifacts: []artifactState{
311-
artifactState{"on-completion", "my-bucket-2", true, false},
312-
artifactState{"on-deletion", "my-bucket-2", false, false},
357+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
358+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, false},
313359
},
314360
},
315361
// entire Workflow based on a WorkflowTemplate which has a Step that references another WorkflowTemplate's template
316362
{
317363
workflowFile: "@testdata/artifactgc/artgc-from-ref-template.yaml",
318364
hasGC: true,
365+
workflowShouldSucceed: true,
319366
expectedGCPodsOnWFCompletion: 1,
320367
expectedArtifacts: []artifactState{
321-
artifactState{"on-completion", "my-bucket-2", true, false},
322-
artifactState{"on-deletion", "my-bucket-2", false, true},
368+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-completion"}, true, false},
369+
artifactState{s3Location{bucketName: "my-bucket-2", specifiedKey: "on-deletion"}, false, true},
323370
},
324371
},
325372
// Step in Workflow references a WorkflowTemplate's template
326373
// Workflow defines ArtifactGC but all artifacts override with "Never" so Artifact GC should not be done
327374
{
328375
workflowFile: "@testdata/artifactgc/artgc-step-wf-tmpl-no-gc.yaml",
329376
hasGC: false,
377+
workflowShouldSucceed: true,
330378
expectedGCPodsOnWFCompletion: 0,
331379
expectedArtifacts: []artifactState{},
332380
},
381+
// Workflow fails to write an artifact that's been defined as an Output
382+
{
383+
workflowFile: "@testdata/artifactgc/artgc-artifact-not-written.yaml",
384+
hasGC: true,
385+
workflowShouldSucceed: false, // artifact not being present causes Workflow to fail
386+
expectedGCPodsOnWFCompletion: 0,
387+
expectedArtifacts: []artifactState{
388+
artifactState{s3Location{bucketName: "my-bucket", derivedKey: &artifactDerivedKey{templateName: "artifact-written", artifactName: "present"}}, false, true},
389+
},
390+
},
333391
} {
334392
// for each test make sure that:
335393
// 1. the finalizer gets added
@@ -352,7 +410,7 @@ func (s *ArtifactsSuite) TestArtifactGC() {
352410
}
353411
})
354412

355-
if when.WorkflowCondition(func(wf *wfv1.Workflow) bool {
413+
if tt.workflowShouldSucceed && when.WorkflowCondition(func(wf *wfv1.Workflow) bool {
356414
return wf.Status.Phase == wfv1.WorkflowFailed || wf.Status.Phase == wfv1.WorkflowError
357415
}) {
358416
fmt.Println("can't reliably verify Artifact GC since workflow failed")
@@ -365,22 +423,27 @@ func (s *ArtifactsSuite) TestArtifactGC() {
365423
WaitForWorkflow(
366424
fixtures.WorkflowCompletionOkay(true),
367425
fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {
368-
return len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion,
426+
return (len(wf.Status.ArtifactGCStatus.PodsRecouped) >= tt.expectedGCPodsOnWFCompletion) || (tt.expectedGCPodsOnWFCompletion == 0),
369427
fmt.Sprintf("for all %d pods to have been recouped", tt.expectedGCPodsOnWFCompletion)
370428
}))
371429

372430
then := when.Then()
373431

374432
// verify that the artifacts that should have been deleted at completion time were
375433
for _, expectedArtifact := range tt.expectedArtifacts {
434+
artifactKey, err := expectedArtifact.artifactLocation.getS3Key(when.GetWorkflow())
435+
fmt.Printf("artifact key: %q\n", artifactKey)
436+
if err != nil {
437+
panic(err)
438+
}
376439
if expectedArtifact.deletedAtWFCompletion {
377-
fmt.Printf("verifying artifact %s is deleted at completion time\n", expectedArtifact.key)
378-
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
440+
fmt.Printf("verifying artifact %s is deleted at completion time\n", artifactKey)
441+
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
379442
assert.NotNil(t, err)
380443
})
381444
} else {
382-
fmt.Printf("verifying artifact %s is not deleted at completion time\n", expectedArtifact.key)
383-
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
445+
fmt.Printf("verifying artifact %s is not deleted at completion time\n", artifactKey)
446+
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
384447
assert.NoError(t, err)
385448
})
386449
}
@@ -390,25 +453,32 @@ func (s *ArtifactsSuite) TestArtifactGC() {
390453

391454
when.
392455
DeleteWorkflow().
393-
WaitForWorkflowDeletion()
456+
WaitForWorkflowDeletion().
457+
Then().
458+
ExpectWorkflowDeleted()
394459

395460
when = when.RemoveFinalizers(false) // just in case - if the above test failed we need to forcibly remove the finalizer for Artifact GC
396461

397462
then = when.Then()
398463

399464
for _, expectedArtifact := range tt.expectedArtifacts {
465+
artifactKey, err := expectedArtifact.artifactLocation.getS3Key(when.GetWorkflow())
466+
fmt.Printf("artifact key: %q\n", artifactKey)
467+
if err != nil {
468+
panic(err)
469+
}
400470

401471
if expectedArtifact.deletedAtWFCompletion { // already checked this
402472
continue
403473
}
404474
if expectedArtifact.deletedAtWFDeletion {
405-
fmt.Printf("verifying artifact %s is deleted\n", expectedArtifact.key)
406-
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
475+
fmt.Printf("verifying artifact %s is deleted\n", artifactKey)
476+
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
407477
assert.NotNil(t, err)
408478
})
409479
} else {
410-
fmt.Printf("verifying artifact %s is not deleted\n", expectedArtifact.key)
411-
then.ExpectArtifactByKey(expectedArtifact.key, expectedArtifact.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
480+
fmt.Printf("verifying artifact %s is not deleted\n", artifactKey)
481+
then.ExpectArtifactByKey(artifactKey, expectedArtifact.artifactLocation.bucketName, func(t *testing.T, object minio.ObjectInfo, err error) {
412482
assert.NoError(t, err)
413483
})
414484
}

test/e2e/fixtures/when.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ func (w *When) SubmitWorkflow() *When {
5858
return w
5959
}
6060

61+
func (w *When) GetWorkflow() *wfv1.Workflow {
62+
return w.wf
63+
}
64+
6165
func label(obj metav1.Object) {
6266
labels := obj.GetLabels()
6367
if labels == nil {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
apiVersion: argoproj.io/v1alpha1
2+
kind: Workflow
3+
metadata:
4+
generateName: artgc-artifact-not-written-
5+
spec:
6+
entrypoint: entrypoint
7+
artifactGC:
8+
strategy: OnWorkflowDeletion
9+
podGC:
10+
strategy: ""
11+
templates:
12+
- name: entrypoint
13+
steps:
14+
- - name: artifact-written
15+
template: artifact-written
16+
- - name: artifact-not-written
17+
template: artifact-not-written
18+
- name: artifact-written
19+
container:
20+
image: argoproj/argosay:v2
21+
command:
22+
- sh
23+
- -c
24+
args:
25+
- |
26+
echo "something" > /tmp/present
27+
outputs:
28+
artifacts:
29+
- name: present
30+
path: /tmp/present
31+
- name: artifact-not-written
32+
container:
33+
image: argoproj/argosay:v2
34+
command:
35+
- sh
36+
- -c
37+
args:
38+
- |
39+
echo "intentionally not writing anything to disk"
40+
outputs:
41+
artifacts:
42+
- name: notpresent
43+
path: /tmp/notpresent

workflow/executor/executor.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -291,26 +291,27 @@ func (we *WorkflowExecutor) StageFiles() error {
291291
}
292292

293293
// SaveArtifacts uploads artifacts to the archive location
294-
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error {
294+
func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) (wfv1.Artifacts, error) {
295+
artifacts := wfv1.Artifacts{}
295296
if len(we.Template.Outputs.Artifacts) == 0 {
296297
log.Infof("No output artifacts")
297-
return nil
298+
return artifacts, nil
298299
}
299300

300301
log.Infof("Saving output artifacts")
301302
err := os.MkdirAll(tempOutArtDir, os.ModePerm)
302303
if err != nil {
303-
return argoerrs.InternalWrapError(err)
304+
return artifacts, argoerrs.InternalWrapError(err)
304305
}
305306

306-
for i, art := range we.Template.Outputs.Artifacts {
307+
for _, art := range we.Template.Outputs.Artifacts {
307308
err := we.saveArtifact(ctx, common.MainContainerName, &art)
308309
if err != nil {
309-
return err
310+
return artifacts, err
310311
}
311-
we.Template.Outputs.Artifacts[i] = art
312+
artifacts = append(artifacts, art)
312313
}
313-
return nil
314+
return artifacts, nil
314315
}
315316

316317
func (we *WorkflowExecutor) saveArtifact(ctx context.Context, containerName string, art *wfv1.Artifact) error {
@@ -832,9 +833,9 @@ func (we *WorkflowExecutor) InitializeOutput(ctx context.Context) {
832833
}
833834

834835
// ReportOutputs updates the WorkflowTaskResult (or falls back to annotate the Pod)
835-
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, logArtifacts []wfv1.Artifact) error {
836+
func (we *WorkflowExecutor) ReportOutputs(ctx context.Context, artifacts []wfv1.Artifact) error {
836837
outputs := we.Template.Outputs.DeepCopy()
837-
outputs.Artifacts = append(outputs.Artifacts, logArtifacts...)
838+
outputs.Artifacts = artifacts
838839
return we.reportResult(ctx, wfv1.NodeResult{Outputs: outputs})
839840
}
840841

workflow/executor/executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ func TestSaveArtifacts(t *testing.T) {
468468

469469
for _, tt := range tests {
470470
ctx := context.Background()
471-
err := tt.workflowExecutor.SaveArtifacts(ctx)
471+
_, err := tt.workflowExecutor.SaveArtifacts(ctx)
472472
if err != nil {
473473
assert.Equal(t, tt.expectError, true)
474474
continue

0 commit comments

Comments
 (0)