Skip to content

Commit 0e3ba16

Browse files
fixes
1 parent 6887b8b commit 0e3ba16

File tree

23 files changed

+311
-17
lines changed

23 files changed

+311
-17
lines changed

code/go/internal/validator/semantic/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func listDataStreams(fsys fspath.FS) ([]string, error) {
319319
func listPipelineFiles(fsys fspath.FS, dataStream string) ([]pipelineFileMetadata, error) {
320320
var pipelineFileMetadatas []pipelineFileMetadata
321321

322-
ingestPipelineDir := path.Join(dataStreamDir, "elasticsearch", "ingest_pipeline")
322+
ingestPipelineDir := path.Join(dataStreamDir, dataStream, "elasticsearch", "ingest_pipeline")
323323
pipelineFiles, err := readPipelinesFolder(fsys, ingestPipelineDir)
324324
if err != nil {
325325
return nil, fmt.Errorf("cannot read pipeline files from integration package: %w", err)

code/go/internal/validator/semantic/validate_pipeline_tags.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func ValidatePipelineTags(fsys fspath.FS) specerrors.ValidationErrors {
3939
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
4040
}
4141

42-
if vErrs := validatePipelineTags(&pipeline); len(vErrs) > 0 {
42+
if vErrs := validatePipelineTags(&pipeline, pipelineFile.fullFilePath); len(vErrs) > 0 {
4343
errors = append(errors, vErrs...)
4444
}
4545
}
@@ -48,44 +48,44 @@ func ValidatePipelineTags(fsys fspath.FS) specerrors.ValidationErrors {
4848
return errors
4949
}
5050

51-
func validatePipelineTags(pipeline *ingestPipeline) specerrors.ValidationErrors {
51+
func validatePipelineTags(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
5252
var errors specerrors.ValidationErrors
5353

5454
seen := map[string]struct{}{}
5555
for _, proc := range pipeline.Processors {
56-
procErrors := checkPipelineTag(proc, seen)
56+
procErrors := checkPipelineTag(proc, seen, filename)
5757
errors = append(errors, procErrors...)
5858
}
5959

6060
return errors
6161
}
6262

63-
func checkPipelineTag(proc *processor, seen map[string]struct{}) specerrors.ValidationErrors {
63+
func checkPipelineTag(proc *processor, seen map[string]struct{}, filename string) specerrors.ValidationErrors {
6464
var errors specerrors.ValidationErrors
6565

6666
for _, subProc := range proc.OnFailure {
67-
subErrors := checkPipelineTag(subProc, seen)
67+
subErrors := checkPipelineTag(subProc, seen, filename)
6868
errors = append(errors, subErrors...)
6969
}
7070

7171
raw, ok := proc.Attributes["tag"]
7272
if !ok {
73-
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("%s processor at line %d missing required tag", proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
73+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d missing required tag", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
7474
return errors
7575
}
7676

7777
tag, ok := raw.(string)
7878
if !ok {
79-
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("%s processor at line %d has invalid tag value", proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
79+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has invalid tag value", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
8080
return errors
8181
}
8282
if tag == "" {
83-
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("%s processor at line %d has empty tag value", proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
83+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has empty tag value", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
8484
return errors
8585
}
8686

8787
if _, dup := seen[tag]; dup {
88-
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("%s processor at line %d has duplicate tag value: %q", proc.Type, proc.position.line, tag), specerrors.CodePipelineTagDuplicate))
88+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has duplicate tag value: %q", filename, proc.Type, proc.position.line, tag), specerrors.CodePipelineTagDuplicate))
8989
return errors
9090
}
9191

code/go/internal/validator/semantic/validate_pipeline_tags_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ processors:
5151
key: fail_value_1
5252
`,
5353
errors: []string{
54-
`set processor at line 3 missing required tag (SVR00006)`,
54+
`file "default.yml" is invalid: set processor at line 3 missing required tag (SVR00006)`,
5555
},
5656
},
5757
{
@@ -68,7 +68,7 @@ processors:
6868
value: fail_value_1
6969
`,
7070
errors: []string{
71-
`set processor at line 8 missing required tag (SVR00006)`,
71+
`file "default.yml" is invalid: set processor at line 8 missing required tag (SVR00006)`,
7272
},
7373
},
7474
{
@@ -85,7 +85,7 @@ processors:
8585
value: value2
8686
`,
8787
errors: []string{
88-
`set processor at line 7 has duplicate tag value: "set_1" (SVR00007)`,
88+
`file "default.yml" is invalid: set processor at line 7 has duplicate tag value: "set_1" (SVR00007)`,
8989
},
9090
},
9191
{
@@ -103,7 +103,7 @@ processors:
103103
value: fail_value_1
104104
`,
105105
errors: []string{
106-
`set processor at line 3 has duplicate tag value: "set_1" (SVR00007)`,
106+
`file "default.yml" is invalid: set processor at line 3 has duplicate tag value: "set_1" (SVR00007)`,
107107
},
108108
},
109109
{
@@ -116,7 +116,7 @@ processors:
116116
value: value1
117117
`,
118118
errors: []string{
119-
`set processor at line 3 has invalid tag value (SVR00006)`,
119+
`file "default.yml" is invalid: set processor at line 3 has invalid tag value (SVR00006)`,
120120
},
121121
},
122122
{
@@ -129,7 +129,7 @@ processors:
129129
value: value1
130130
`,
131131
errors: []string{
132-
`set processor at line 3 has empty tag value (SVR00006)`,
132+
`file "default.yml" is invalid: set processor at line 3 has empty tag value (SVR00006)`,
133133
},
134134
},
135135
}
@@ -140,7 +140,7 @@ processors:
140140
err := yaml.Unmarshal([]byte(tc.pipeline), &pipeline)
141141
require.NoError(t, err)
142142

143-
errors := validatePipelineTags(&pipeline)
143+
errors := validatePipelineTags(&pipeline, "default.yml")
144144
assert.Len(t, errors, len(tc.errors))
145145
for _, err := range errors {
146146
assert.Contains(t, tc.errors, err.Error())

code/go/pkg/validator/validator_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,12 @@ func TestValidateIngestPipelines(t *testing.T) {
829829
"field processors.2.remove.if: rename \"message\" to \"event.original\" processor requires remove \"message\" processor with if: 'ctx.event?.original != null' (JSE00001)",
830830
},
831831
},
832+
"bad_pipeline_tags": {
833+
"example": []string{
834+
"set processor at line 4 missing required tag (SVR00006)",
835+
"set processor at line 15 has duplicate tag value: \"set_sample_field\" (SVR00007)",
836+
},
837+
},
832838
}
833839

834840
for pkgName, pipelines := range tests {

test/packages/bad_agent_version_v3/data_stream/agent_settings/elasticsearch/ingest_pipeline/default.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
description: Pipeline for processing sample logs
33
processors:
44
- set:
5+
tag: set_sample_field
56
field: sample_field
67
value: "1"
78
on_failure:

test/packages/bad_agent_version_v3/data_stream/ecs_import_mappings/elasticsearch/ingest_pipeline/default.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
description: Pipeline for processing sample logs
33
processors:
44
- set:
5+
tag: set_sample_field
56
field: sample_field
67
value: "1"
78
on_failure:

test/packages/bad_agent_version_v3/data_stream/foo/elasticsearch/ingest_pipeline/default.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ description: Pipeline for Microsoft DHCP
33

44
processors:
55
- rename:
6+
tag: rename_foo_to_message
67
field: foo
78
target_field: message
89
ignore_missing: true

test/packages/bad_agent_version_v3/data_stream/rename_message/elasticsearch/ingest_pipeline/default.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ description: Test pipeline with renaming processor
33
processors:
44
# cf. https://github.com/elastic/integrations/pull/7026/files#diff-dafc5693f21abb74b294987b502cdc5770f22cccff3c5a8fee0d48510791b811
55
- rename:
6+
tag: rename_message_to_event_original
67
field: message
78
target_field: event.original
89
ignore_missing: true
910
if: 'ctx.event?.original == null'
1011
description: 'Renames the original `message` field to `event.original` to store a copy of the original message. The `event.original` field is not touched if the document already has one; it may happen when Logstash sends the document.'
1112
- remove:
13+
tag: remove_message
1214
field: message
1315
ignore_missing: true
1416
if: 'ctx.event?.original != null'

test/packages/bad_agent_version_v3/data_stream/root/elasticsearch/ingest_pipeline/default.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
description: Pipeline for processing sample logs
33
processors:
44
- set:
5+
tag: set_sample_field
56
field: sample_field
67
value: "1"
78
on_failure:

test/packages/bad_agent_version_v3/data_stream/routing_rules/elasticsearch/ingest_pipeline/default.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
description: Pipeline for processing sample logs
33
processors:
44
- set:
5+
tag: set_sample_field
56
field: sample_field
67
value: "1"
78
on_failure:

0 commit comments

Comments
 (0)