Skip to content

Commit b2d626a

Browse files
feat: Add validator for pipeline tags
- Add validation rule SVR00006 to verify all ingest pipeline processors have a valid tag - Add validation rule SVR00007 to verify all ingest pipeline processor tags are unique
1 parent 3897ce0 commit b2d626a

File tree

6 files changed

+329
-0
lines changed

6 files changed

+329
-0
lines changed

code/go/internal/validator/semantic/types.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,50 @@ func (r *runtimeField) UnmarshalYAML(value *yaml.Node) error {
9191
return r.unmarshalString(value.Value)
9292
}
9393

94+
type position struct {
95+
file string
96+
line int
97+
column int
98+
}
99+
100+
func (p position) String() string {
101+
return fmt.Sprintf("%s:%d:%d", p.file, p.line, p.column)
102+
}
103+
104+
type processor struct {
105+
Type string
106+
Attributes map[string]any
107+
OnFailure []*processor
108+
109+
position position
110+
}
111+
112+
func (p *processor) UnmarshalYAML(value *yaml.Node) error {
113+
var procMap map[string]struct {
114+
Attributes map[string]any `yaml:",inline"`
115+
OnFailure []*processor `yaml:"on_failure"`
116+
}
117+
if err := value.Decode(&procMap); err != nil {
118+
return err
119+
}
120+
121+
for k, v := range procMap {
122+
p.Type = k
123+
p.Attributes = v.Attributes
124+
p.OnFailure = v.OnFailure
125+
break
126+
}
127+
128+
p.position.line = value.Line
129+
p.position.column = value.Column
130+
131+
return nil
132+
}
133+
134+
type ingestPipeline struct {
135+
Processors []*processor `yaml:"processors"`
136+
}
137+
94138
type field struct {
95139
Name string `yaml:"name"`
96140
Type string `yaml:"type"`
@@ -111,6 +155,12 @@ type fieldFileMetadata struct {
111155
fullFilePath string
112156
}
113157

158+
type pipelineFileMetadata struct {
159+
dataStream string
160+
filePath string
161+
fullFilePath string
162+
}
163+
114164
type validateFunc func(fileMetadata fieldFileMetadata, f field) specerrors.ValidationErrors
115165

116166
func validateFields(fsys fspath.FS, validate validateFunc) specerrors.ValidationErrors {
@@ -219,6 +269,23 @@ func readFieldsFolder(fsys fspath.FS, fieldsDir string) ([]string, error) {
219269
return fieldsFiles, nil
220270
}
221271

272+
func readPipelinesFolder(fsys fspath.FS, pipelinesDir string) ([]string, error) {
273+
var pipelineFiles []string
274+
entries, err := fs.ReadDir(fsys, pipelinesDir)
275+
if errors.Is(err, os.ErrNotExist) {
276+
return []string{}, nil
277+
}
278+
if err != nil {
279+
return nil, fmt.Errorf("can't list pipelines directory (path: %s): %w", fsys.Path(pipelinesDir), err)
280+
}
281+
282+
for _, v := range entries {
283+
pipelineFiles = append(pipelineFiles, path.Join(pipelinesDir, v.Name()))
284+
}
285+
286+
return pipelineFiles, nil
287+
}
288+
222289
func unmarshalFields(fsys fspath.FS, fieldsPath string) (fields, error) {
223290
content, err := fs.ReadFile(fsys, fieldsPath)
224291
if err != nil {
@@ -248,3 +315,23 @@ func listDataStreams(fsys fspath.FS) ([]string, error) {
248315
}
249316
return list, nil
250317
}
318+
319+
func listPipelineFiles(fsys fspath.FS, dataStream string) ([]pipelineFileMetadata, error) {
320+
var pipelineFileMetadatas []pipelineFileMetadata
321+
322+
ingestPipelineDir := path.Join(dataStreamDir, "elasticsearch", "ingest_pipeline")
323+
pipelineFiles, err := readPipelinesFolder(fsys, ingestPipelineDir)
324+
if err != nil {
325+
return nil, fmt.Errorf("cannot read pipeline files from integration package: %w", err)
326+
}
327+
328+
for _, file := range pipelineFiles {
329+
pipelineFileMetadatas = append(pipelineFileMetadatas, pipelineFileMetadata{
330+
filePath: file,
331+
fullFilePath: fsys.Path(file),
332+
dataStream: dataStream,
333+
})
334+
}
335+
336+
return pipelineFileMetadatas, nil
337+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package semantic
2+
3+
import (
4+
"fmt"
5+
"io/fs"
6+
7+
"gopkg.in/yaml.v3"
8+
9+
"github.com/elastic/package-spec/v3/code/go/internal/fspath"
10+
"github.com/elastic/package-spec/v3/code/go/pkg/specerrors"
11+
)
12+
13+
func ValidatePipelineTags(fsys fspath.FS) specerrors.ValidationErrors {
14+
dataStreams, err := listDataStreams(fsys)
15+
if err != nil {
16+
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
17+
}
18+
19+
var errors specerrors.ValidationErrors
20+
for _, dataStream := range dataStreams {
21+
pipelineFiles, err := listPipelineFiles(fsys, dataStream)
22+
if err != nil {
23+
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
24+
}
25+
26+
for _, pipelineFile := range pipelineFiles {
27+
content, err := fs.ReadFile(fsys, pipelineFile.filePath)
28+
if err != nil {
29+
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
30+
}
31+
32+
var pipeline ingestPipeline
33+
if err = yaml.Unmarshal(content, &pipeline); err != nil {
34+
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
35+
}
36+
37+
if vErrs := validatePipelineTags(&pipeline); len(vErrs) > 0 {
38+
errors = append(errors, vErrs...)
39+
}
40+
}
41+
}
42+
43+
return errors
44+
}
45+
46+
func validatePipelineTags(pipeline *ingestPipeline) specerrors.ValidationErrors {
47+
var errors specerrors.ValidationErrors
48+
49+
seen := map[string]struct{}{}
50+
for _, proc := range pipeline.Processors {
51+
procErrors := checkPipelineTag(proc, seen)
52+
errors = append(errors, procErrors...)
53+
}
54+
55+
return errors
56+
}
57+
58+
func checkPipelineTag(proc *processor, seen map[string]struct{}) specerrors.ValidationErrors {
59+
var errors specerrors.ValidationErrors
60+
61+
for _, subProc := range proc.OnFailure {
62+
subErrors := checkPipelineTag(subProc, seen)
63+
errors = append(errors, subErrors...)
64+
}
65+
66+
raw, ok := proc.Attributes["tag"]
67+
if !ok {
68+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("%s processor at line %d missing required tag", proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
69+
return errors
70+
}
71+
72+
tag, ok := raw.(string)
73+
if !ok {
74+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("%s processor at line %d has invalid tag value", proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
75+
return errors
76+
}
77+
if tag == "" {
78+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("%s processor at line %d has empty tag value", proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
79+
return errors
80+
}
81+
82+
if _, dup := seen[tag]; dup {
83+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("%s processor at line %d has duplicate tag value: %q", proc.Type, proc.position.line, tag), specerrors.CodePipelineTagDuplicate))
84+
return errors
85+
}
86+
87+
seen[tag] = struct{}{}
88+
89+
return errors
90+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package semantic
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
"gopkg.in/yaml.v3"
9+
)
10+
11+
func TestValidatePipelineTags(t *testing.T) {
12+
testCases := []struct {
13+
name string
14+
pipeline string
15+
errors []string
16+
}{
17+
{
18+
name: "good",
19+
pipeline: `
20+
processors:
21+
- set:
22+
tag: set_1
23+
field: key1
24+
value: value1
25+
- set:
26+
tag: set_2
27+
field: key2
28+
value: value2
29+
on_failure:
30+
- set:
31+
tag: onfail_1
32+
field: fail_key_1
33+
key: fail_value_1
34+
`,
35+
},
36+
{
37+
name: "missing-tag",
38+
pipeline: `
39+
processors:
40+
- set:
41+
field: key1
42+
value: value1
43+
on_failure:
44+
- set:
45+
tag: onfail_1
46+
field: fail_key_1
47+
key: fail_value_1
48+
`,
49+
errors: []string{
50+
`set processor at line 3 missing required tag (SVR00006)`,
51+
},
52+
},
53+
{
54+
name: "missing-tag-nested",
55+
pipeline: `
56+
processors:
57+
- set:
58+
tag: set_1
59+
field: key1
60+
value: value1
61+
on_failure:
62+
- set:
63+
field: fail_key_1
64+
value: fail_value_1
65+
`,
66+
errors: []string{
67+
`set processor at line 8 missing required tag (SVR00006)`,
68+
},
69+
},
70+
{
71+
name: "duplicate-tag",
72+
pipeline: `
73+
processors:
74+
- set:
75+
tag: set_1
76+
field: key1
77+
value: value1
78+
- set:
79+
tag: set_1
80+
field: key2
81+
value: value2
82+
`,
83+
errors: []string{
84+
`set processor at line 7 has duplicate tag value: "set_1" (SVR00007)`,
85+
},
86+
},
87+
{
88+
name: "duplicate-nested-tag",
89+
pipeline: `
90+
processors:
91+
- set:
92+
tag: set_1
93+
field: key1
94+
value: value1
95+
on_failure:
96+
- set:
97+
tag: set_1
98+
field: fail_key_1
99+
value: fail_value_1
100+
`,
101+
errors: []string{
102+
`set processor at line 3 has duplicate tag value: "set_1" (SVR00007)`,
103+
},
104+
},
105+
{
106+
name: "invalid-tag-value",
107+
pipeline: `
108+
processors:
109+
- set:
110+
tag: 1
111+
field: key1
112+
value: value1
113+
`,
114+
errors: []string{
115+
`set processor at line 3 has invalid tag value (SVR00006)`,
116+
},
117+
},
118+
{
119+
name: "empty-tag-value",
120+
pipeline: `
121+
processors:
122+
- set:
123+
tag: ''
124+
field: key1
125+
value: value1
126+
`,
127+
errors: []string{
128+
`set processor at line 3 has empty tag value (SVR00006)`,
129+
},
130+
},
131+
}
132+
133+
for _, tc := range testCases {
134+
t.Run(tc.name, func(t *testing.T) {
135+
var pipeline ingestPipeline
136+
err := yaml.Unmarshal([]byte(tc.pipeline), &pipeline)
137+
require.NoError(t, err)
138+
139+
errors := validatePipelineTags(&pipeline)
140+
assert.Len(t, errors, len(tc.errors))
141+
for _, err := range errors {
142+
assert.Contains(t, tc.errors, err.Error())
143+
}
144+
})
145+
}
146+
}

code/go/internal/validator/spec.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ func (s Spec) rules(pkgType string, rootSpec spectypes.ItemSpec) validationRules
217217
{fn: semantic.ValidateDocsStructure},
218218
{fn: semantic.ValidateDeploymentModes, types: []string{"integration"}},
219219
{fn: semantic.ValidateDurationVariables, since: semver.MustParse("3.5.0")},
220+
{fn: semantic.ValidatePipelineTags, types: []string{"integration"}, since: semver.MustParse("3.6.0")},
220221
}
221222

222223
var validationRules validationRules

code/go/pkg/specerrors/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ const (
2121
CodeKibanaDanglingObjectsIDs = "SVR00003"
2222
CodeVisualizationByValue = "SVR00004"
2323
CodeMinimumKibanaVersion = "SVR00005"
24+
CodePipelineTagRequired = "SVR00006"
25+
CodePipelineTagDuplicate = "SVR00007"
2426
)

spec/changelog.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
- description: Add support for semantic_text field definition.
1313
type: enhancement
1414
link: https://github.com/elastic/package-spec/pull/807
15+
- description: Add pipeline tag validations.
16+
type: enhancement
17+
link: https://github.com/elastic/package-spec/pull/1
1518
- version: 3.5.0
1619
changes:
1720
- description: Add `duration` variable data type with `min_duration` and `max_duration` validation properties.

0 commit comments

Comments
 (0)