-
Notifications
You must be signed in to change notification settings - Fork 86
feat: Add validator for pipeline tags #1010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
d1c725c
21c3e23
b545149
3c7bc9e
a594f6a
09f5c9c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -0,0 +1,95 @@ | ||||
| // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||||
| // or more contributor license agreements. Licensed under the Elastic License; | ||||
| // you may not use this file except in compliance with the Elastic License. | ||||
|
|
||||
| package semantic | ||||
|
|
||||
| import ( | ||||
| "fmt" | ||||
| "io/fs" | ||||
|
|
||||
| "gopkg.in/yaml.v3" | ||||
|
|
||||
| "github.com/elastic/package-spec/v3/code/go/internal/fspath" | ||||
| "github.com/elastic/package-spec/v3/code/go/pkg/specerrors" | ||||
| ) | ||||
|
|
||||
| // ValidatePipelineTags validates ingest pipeline processor tags. | ||||
| func ValidatePipelineTags(fsys fspath.FS) specerrors.ValidationErrors { | ||||
| dataStreams, err := listDataStreams(fsys) | ||||
| if err != nil { | ||||
| return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)} | ||||
| } | ||||
|
|
||||
| var errors specerrors.ValidationErrors | ||||
| for _, dataStream := range dataStreams { | ||||
| pipelineFiles, err := listPipelineFiles(fsys, dataStream) | ||||
| if err != nil { | ||||
| return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)} | ||||
| } | ||||
|
|
||||
| for _, pipelineFile := range pipelineFiles { | ||||
| content, err := fs.ReadFile(fsys, pipelineFile.filePath) | ||||
| if err != nil { | ||||
| return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)} | ||||
| } | ||||
|
|
||||
| var pipeline ingestPipeline | ||||
| if err = yaml.Unmarshal(content, &pipeline); err != nil { | ||||
| return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)} | ||||
| } | ||||
|
|
||||
| if vErrs := validatePipelineTags(&pipeline, pipelineFile.fullFilePath); len(vErrs) > 0 { | ||||
| errors = append(errors, vErrs...) | ||||
| } | ||||
| } | ||||
| } | ||||
|
|
||||
| return errors | ||||
| } | ||||
|
|
||||
| func validatePipelineTags(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors { | ||||
| var errors specerrors.ValidationErrors | ||||
|
|
||||
| seen := map[string]struct{}{} | ||||
| for _, proc := range pipeline.Processors { | ||||
| procErrors := checkPipelineTag(&proc, seen, filename) | ||||
| errors = append(errors, procErrors...) | ||||
| } | ||||
|
|
||||
| return errors | ||||
| } | ||||
|
|
||||
| func checkPipelineTag(proc *processor, seen map[string]struct{}, filename string) specerrors.ValidationErrors { | ||||
| var errors specerrors.ValidationErrors | ||||
|
|
||||
| for _, subProc := range proc.OnFailure { | ||||
| subErrors := checkPipelineTag(&subProc, seen, filename) | ||||
| errors = append(errors, subErrors...) | ||||
| } | ||||
|
|
||||
| raw, ok := proc.Attributes["tag"] | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looking into the pipeline spec
am i right assuming the definition should be here? if so, could we control the required validation version from the json patch rules?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I noticed that In this case, though,
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, you are right, it would be better to handle this from the json schema if possible. Though as implemented here it is easier to make it skippable, and to provide better error messages. If brought to the schema we'd need to add rules to Maybe an option would be to use both, validate that there are tags in the spec, and check with a semantic rule that tags are not duplicated.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Well, this seems to be easy too: https://json-schema.org/understanding-json-schema/reference/array#uniqueItems @taylor-swanson wdyt about rewriting this with the spec?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's certainly an interesting idea... How would that work though since It ends of being
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, it would be complex to check unique items in different objects... I guess this part would still need to be in code. |
||||
| if !ok { | ||||
| errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d missing required tag", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired)) | ||||
| return errors | ||||
| } | ||||
|
|
||||
| tag, ok := raw.(string) | ||||
| if !ok { | ||||
| errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has invalid tag value", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired)) | ||||
| return errors | ||||
| } | ||||
| if tag == "" { | ||||
| errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has empty tag value", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired)) | ||||
| return errors | ||||
| } | ||||
|
|
||||
| if _, dup := seen[tag]; dup { | ||||
| errors = append(errors, specerrors.NewStructuredErrorf("file %q is invalid: %s processor at line %d has duplicate tag value: %q", filename, proc.Type, proc.position.line, tag)) | ||||
| return errors | ||||
| } | ||||
|
|
||||
| seen[tag] = struct{}{} | ||||
|
|
||||
| return errors | ||||
| } | ||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| // or more contributor license agreements. Licensed under the Elastic License; | ||
| // you may not use this file except in compliance with the Elastic License. | ||
|
|
||
| package semantic | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "gopkg.in/yaml.v3" | ||
| ) | ||
|
|
||
| func TestValidatePipelineTags(t *testing.T) { | ||
| testCases := []struct { | ||
| name string | ||
| pipeline string | ||
| errors []string | ||
| }{ | ||
| { | ||
| name: "good", | ||
| pipeline: ` | ||
| processors: | ||
| - set: | ||
| tag: set_1 | ||
| field: key1 | ||
| value: value1 | ||
| - set: | ||
| tag: set_2 | ||
| field: key2 | ||
| value: value2 | ||
| on_failure: | ||
| - set: | ||
| tag: onfail_1 | ||
| field: fail_key_1 | ||
| key: fail_value_1 | ||
| `, | ||
| }, | ||
| { | ||
| name: "missing-tag", | ||
| pipeline: ` | ||
| processors: | ||
| - set: | ||
| field: key1 | ||
| value: value1 | ||
| on_failure: | ||
| - set: | ||
| tag: onfail_1 | ||
| field: fail_key_1 | ||
| key: fail_value_1 | ||
| `, | ||
| errors: []string{ | ||
| `file "default.yml" is invalid: set processor at line 3 missing required tag (SVR00006)`, | ||
| }, | ||
| }, | ||
| { | ||
| name: "missing-tag-nested", | ||
| pipeline: ` | ||
| processors: | ||
| - set: | ||
| tag: set_1 | ||
| field: key1 | ||
| value: value1 | ||
| on_failure: | ||
| - set: | ||
| field: fail_key_1 | ||
| value: fail_value_1 | ||
| `, | ||
| errors: []string{ | ||
| `file "default.yml" is invalid: set processor at line 8 missing required tag (SVR00006)`, | ||
| }, | ||
| }, | ||
| { | ||
| name: "duplicate-tag", | ||
| pipeline: ` | ||
| processors: | ||
| - set: | ||
| tag: set_1 | ||
| field: key1 | ||
| value: value1 | ||
| - set: | ||
| tag: set_1 | ||
| field: key2 | ||
| value: value2 | ||
| `, | ||
| errors: []string{ | ||
| `file "default.yml" is invalid: set processor at line 7 has duplicate tag value: "set_1"`, | ||
| }, | ||
| }, | ||
| { | ||
| name: "duplicate-nested-tag", | ||
| pipeline: ` | ||
| processors: | ||
| - set: | ||
| tag: set_1 | ||
| field: key1 | ||
| value: value1 | ||
| on_failure: | ||
| - set: | ||
| tag: set_1 | ||
| field: fail_key_1 | ||
| value: fail_value_1 | ||
| `, | ||
| errors: []string{ | ||
| `file "default.yml" is invalid: set processor at line 3 has duplicate tag value: "set_1"`, | ||
| }, | ||
| }, | ||
| { | ||
| name: "invalid-tag-value", | ||
| pipeline: ` | ||
| processors: | ||
| - set: | ||
| tag: 1 | ||
| field: key1 | ||
| value: value1 | ||
| `, | ||
| errors: []string{ | ||
| `file "default.yml" is invalid: set processor at line 3 has invalid tag value (SVR00006)`, | ||
| }, | ||
| }, | ||
| { | ||
| name: "empty-tag-value", | ||
| pipeline: ` | ||
| processors: | ||
| - set: | ||
| tag: '' | ||
| field: key1 | ||
| value: value1 | ||
| `, | ||
| errors: []string{ | ||
| `file "default.yml" is invalid: set processor at line 3 has empty tag value (SVR00006)`, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| var pipeline ingestPipeline | ||
| err := yaml.Unmarshal([]byte(tc.pipeline), &pipeline) | ||
| require.NoError(t, err) | ||
|
|
||
| errors := validatePipelineTags(&pipeline, "default.yml") | ||
| assert.Len(t, errors, len(tc.errors)) | ||
| for _, err := range errors { | ||
| assert.Contains(t, tc.errors, err.Error()) | ||
| } | ||
| }) | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.