Skip to content

Commit d1c725c

Browse files
feat: Add validator for pipeline tags
- Add validation rule SVR00006 to verify all ingest pipeline processors have a valid tag - Add validation rule SVR00007 to verify all ingest pipeline processor tags are unique
1 parent 2183978 commit d1c725c

File tree

34 files changed

+724
-2
lines changed

34 files changed

+724
-2
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ to [JSON Schema](https://json-schema.org/), but with a couple of differences:
6868

6969
Expected package files, e.g. `manifest.yml` themselves have a structure to their contents. This structure is described in specification files using JSON schema (this is point 2. above). These specification files are also written as YAML for readability.
7070

71-
Note that the specification files primarily define the structure (syntax) of a package's contents. To a limited extent they may also define some semantics, e.g. enumeration values for certain fields. Richer semantics, however, will need to be expressed as validation code.
71+
Note that the specification files primarily define the structure (syntax) of a package's contents. To a limited extent they may also define some semantics, e.g. enumeration values for certain fields. Richer semantics, however, will need to be expressed as [validation code](docs/validations.md).
7272

7373
# Specification Versioning
7474

code/go/internal/validator/semantic/types.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,50 @@ func (r *runtimeField) UnmarshalYAML(value *yaml.Node) error {
9191
return r.unmarshalString(value.Value)
9292
}
9393

94+
type position struct {
95+
file string
96+
line int
97+
column int
98+
}
99+
100+
func (p position) String() string {
101+
return fmt.Sprintf("%s:%d:%d", p.file, p.line, p.column)
102+
}
103+
104+
type processor struct {
105+
Type string
106+
Attributes map[string]any
107+
OnFailure []*processor
108+
109+
position position
110+
}
111+
112+
func (p *processor) UnmarshalYAML(value *yaml.Node) error {
113+
var procMap map[string]struct {
114+
Attributes map[string]any `yaml:",inline"`
115+
OnFailure []*processor `yaml:"on_failure"`
116+
}
117+
if err := value.Decode(&procMap); err != nil {
118+
return err
119+
}
120+
121+
for k, v := range procMap {
122+
p.Type = k
123+
p.Attributes = v.Attributes
124+
p.OnFailure = v.OnFailure
125+
break
126+
}
127+
128+
p.position.line = value.Line
129+
p.position.column = value.Column
130+
131+
return nil
132+
}
133+
134+
type ingestPipeline struct {
135+
Processors []*processor `yaml:"processors"`
136+
}
137+
94138
type field struct {
95139
Name string `yaml:"name"`
96140
Type string `yaml:"type"`
@@ -111,6 +155,12 @@ type fieldFileMetadata struct {
111155
fullFilePath string
112156
}
113157

158+
type pipelineFileMetadata struct {
159+
dataStream string
160+
filePath string
161+
fullFilePath string
162+
}
163+
114164
type validateFunc func(fileMetadata fieldFileMetadata, f field) specerrors.ValidationErrors
115165

116166
func validateFields(fsys fspath.FS, validate validateFunc) specerrors.ValidationErrors {
@@ -219,6 +269,23 @@ func readFieldsFolder(fsys fspath.FS, fieldsDir string) ([]string, error) {
219269
return fieldsFiles, nil
220270
}
221271

272+
func readPipelinesFolder(fsys fspath.FS, pipelinesDir string) ([]string, error) {
273+
var pipelineFiles []string
274+
entries, err := fs.ReadDir(fsys, pipelinesDir)
275+
if errors.Is(err, os.ErrNotExist) {
276+
return []string{}, nil
277+
}
278+
if err != nil {
279+
return nil, fmt.Errorf("can't list pipelines directory (path: %s): %w", fsys.Path(pipelinesDir), err)
280+
}
281+
282+
for _, v := range entries {
283+
pipelineFiles = append(pipelineFiles, path.Join(pipelinesDir, v.Name()))
284+
}
285+
286+
return pipelineFiles, nil
287+
}
288+
222289
func unmarshalFields(fsys fspath.FS, fieldsPath string) (fields, error) {
223290
content, err := fs.ReadFile(fsys, fieldsPath)
224291
if err != nil {
@@ -248,3 +315,23 @@ func listDataStreams(fsys fspath.FS) ([]string, error) {
248315
}
249316
return list, nil
250317
}
318+
319+
func listPipelineFiles(fsys fspath.FS, dataStream string) ([]pipelineFileMetadata, error) {
320+
var pipelineFileMetadatas []pipelineFileMetadata
321+
322+
ingestPipelineDir := path.Join(dataStreamDir, dataStream, "elasticsearch", "ingest_pipeline")
323+
pipelineFiles, err := readPipelinesFolder(fsys, ingestPipelineDir)
324+
if err != nil {
325+
return nil, fmt.Errorf("cannot read pipeline files from integration package: %w", err)
326+
}
327+
328+
for _, file := range pipelineFiles {
329+
pipelineFileMetadatas = append(pipelineFileMetadatas, pipelineFileMetadata{
330+
filePath: file,
331+
fullFilePath: fsys.Path(file),
332+
dataStream: dataStream,
333+
})
334+
}
335+
336+
return pipelineFileMetadatas, nil
337+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package semantic
6+
7+
import (
8+
"fmt"
9+
"io/fs"
10+
11+
"gopkg.in/yaml.v3"
12+
13+
"github.com/elastic/package-spec/v3/code/go/internal/fspath"
14+
"github.com/elastic/package-spec/v3/code/go/pkg/specerrors"
15+
)
16+
17+
// ValidatePipelineTags validates ingest pipeline processor tags.
18+
func ValidatePipelineTags(fsys fspath.FS) specerrors.ValidationErrors {
19+
dataStreams, err := listDataStreams(fsys)
20+
if err != nil {
21+
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
22+
}
23+
24+
var errors specerrors.ValidationErrors
25+
for _, dataStream := range dataStreams {
26+
pipelineFiles, err := listPipelineFiles(fsys, dataStream)
27+
if err != nil {
28+
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
29+
}
30+
31+
for _, pipelineFile := range pipelineFiles {
32+
content, err := fs.ReadFile(fsys, pipelineFile.filePath)
33+
if err != nil {
34+
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
35+
}
36+
37+
var pipeline ingestPipeline
38+
if err = yaml.Unmarshal(content, &pipeline); err != nil {
39+
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
40+
}
41+
42+
if vErrs := validatePipelineTags(&pipeline, pipelineFile.fullFilePath); len(vErrs) > 0 {
43+
errors = append(errors, vErrs...)
44+
}
45+
}
46+
}
47+
48+
return errors
49+
}
50+
51+
func validatePipelineTags(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
52+
var errors specerrors.ValidationErrors
53+
54+
seen := map[string]struct{}{}
55+
for _, proc := range pipeline.Processors {
56+
procErrors := checkPipelineTag(proc, seen, filename)
57+
errors = append(errors, procErrors...)
58+
}
59+
60+
return errors
61+
}
62+
63+
func checkPipelineTag(proc *processor, seen map[string]struct{}, filename string) specerrors.ValidationErrors {
64+
var errors specerrors.ValidationErrors
65+
66+
for _, subProc := range proc.OnFailure {
67+
subErrors := checkPipelineTag(subProc, seen, filename)
68+
errors = append(errors, subErrors...)
69+
}
70+
71+
raw, ok := proc.Attributes["tag"]
72+
if !ok {
73+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d missing required tag", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
74+
return errors
75+
}
76+
77+
tag, ok := raw.(string)
78+
if !ok {
79+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has invalid tag value", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
80+
return errors
81+
}
82+
if tag == "" {
83+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has empty tag value", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
84+
return errors
85+
}
86+
87+
if _, dup := seen[tag]; dup {
88+
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has duplicate tag value: %q", filename, proc.Type, proc.position.line, tag), specerrors.CodePipelineTagDuplicate))
89+
return errors
90+
}
91+
92+
seen[tag] = struct{}{}
93+
94+
return errors
95+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package semantic
6+
7+
import (
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
"gopkg.in/yaml.v3"
13+
)
14+
15+
func TestValidatePipelineTags(t *testing.T) {
16+
testCases := []struct {
17+
name string
18+
pipeline string
19+
errors []string
20+
}{
21+
{
22+
name: "good",
23+
pipeline: `
24+
processors:
25+
- set:
26+
tag: set_1
27+
field: key1
28+
value: value1
29+
- set:
30+
tag: set_2
31+
field: key2
32+
value: value2
33+
on_failure:
34+
- set:
35+
tag: onfail_1
36+
field: fail_key_1
37+
key: fail_value_1
38+
`,
39+
},
40+
{
41+
name: "missing-tag",
42+
pipeline: `
43+
processors:
44+
- set:
45+
field: key1
46+
value: value1
47+
on_failure:
48+
- set:
49+
tag: onfail_1
50+
field: fail_key_1
51+
key: fail_value_1
52+
`,
53+
errors: []string{
54+
`file "default.yml" is invalid: set processor at line 3 missing required tag (SVR00006)`,
55+
},
56+
},
57+
{
58+
name: "missing-tag-nested",
59+
pipeline: `
60+
processors:
61+
- set:
62+
tag: set_1
63+
field: key1
64+
value: value1
65+
on_failure:
66+
- set:
67+
field: fail_key_1
68+
value: fail_value_1
69+
`,
70+
errors: []string{
71+
`file "default.yml" is invalid: set processor at line 8 missing required tag (SVR00006)`,
72+
},
73+
},
74+
{
75+
name: "duplicate-tag",
76+
pipeline: `
77+
processors:
78+
- set:
79+
tag: set_1
80+
field: key1
81+
value: value1
82+
- set:
83+
tag: set_1
84+
field: key2
85+
value: value2
86+
`,
87+
errors: []string{
88+
`file "default.yml" is invalid: set processor at line 7 has duplicate tag value: "set_1" (SVR00007)`,
89+
},
90+
},
91+
{
92+
name: "duplicate-nested-tag",
93+
pipeline: `
94+
processors:
95+
- set:
96+
tag: set_1
97+
field: key1
98+
value: value1
99+
on_failure:
100+
- set:
101+
tag: set_1
102+
field: fail_key_1
103+
value: fail_value_1
104+
`,
105+
errors: []string{
106+
`file "default.yml" is invalid: set processor at line 3 has duplicate tag value: "set_1" (SVR00007)`,
107+
},
108+
},
109+
{
110+
name: "invalid-tag-value",
111+
pipeline: `
112+
processors:
113+
- set:
114+
tag: 1
115+
field: key1
116+
value: value1
117+
`,
118+
errors: []string{
119+
`file "default.yml" is invalid: set processor at line 3 has invalid tag value (SVR00006)`,
120+
},
121+
},
122+
{
123+
name: "empty-tag-value",
124+
pipeline: `
125+
processors:
126+
- set:
127+
tag: ''
128+
field: key1
129+
value: value1
130+
`,
131+
errors: []string{
132+
`file "default.yml" is invalid: set processor at line 3 has empty tag value (SVR00006)`,
133+
},
134+
},
135+
}
136+
137+
for _, tc := range testCases {
138+
t.Run(tc.name, func(t *testing.T) {
139+
var pipeline ingestPipeline
140+
err := yaml.Unmarshal([]byte(tc.pipeline), &pipeline)
141+
require.NoError(t, err)
142+
143+
errors := validatePipelineTags(&pipeline, "default.yml")
144+
assert.Len(t, errors, len(tc.errors))
145+
for _, err := range errors {
146+
assert.Contains(t, tc.errors, err.Error())
147+
}
148+
})
149+
}
150+
}

code/go/internal/validator/spec.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ func (s Spec) rules(pkgType string, rootSpec spectypes.ItemSpec) validationRules
220220
{fn: semantic.ValidateInputPackagesPolicyTemplates, types: []string{"input"}},
221221
{fn: semantic.ValidateMinimumAgentVersion},
222222
{fn: semantic.ValidateIntegrationPolicyTemplates, types: []string{"integration"}},
223+
{fn: semantic.ValidatePipelineTags, types: []string{"integration"}, since: semver.MustParse("3.6.0")},
223224
}
224225

225226
var validationRules validationRules

code/go/pkg/specerrors/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ const (
2121
CodeKibanaDanglingObjectsIDs = "SVR00003"
2222
CodeVisualizationByValue = "SVR00004"
2323
CodeMinimumKibanaVersion = "SVR00005"
24+
CodePipelineTagRequired = "SVR00006"
25+
CodePipelineTagDuplicate = "SVR00007"
2426
)

code/go/pkg/validator/validator_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,12 @@ func TestValidateIngestPipelines(t *testing.T) {
829829
"field processors.2.remove.if: rename \"message\" to \"event.original\" processor requires remove \"message\" processor with if: 'ctx.event?.original != null' (JSE00001)",
830830
},
831831
},
832+
"bad_pipeline_tags": {
833+
"example": []string{
834+
"set processor at line 4 missing required tag (SVR00006)",
835+
"set processor at line 15 has duplicate tag value: \"set_sample_field\" (SVR00007)",
836+
},
837+
},
832838
}
833839

834840
for pkgName, pipelines := range tests {

0 commit comments

Comments
 (0)