Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ to [JSON Schema](https://json-schema.org/), but with a couple of differences:

Expected package files, e.g. `manifest.yml` themselves have a structure to their contents. This structure is described in specification files using JSON schema (this is point 2. above). These specification files are also written as YAML for readability.

Note that the specification files primarily define the structure (syntax) of a package's contents. To a limited extent they may also define some semantics, e.g. enumeration values for certain fields. Richer semantics, however, will need to be expressed as validation code.
Note that the specification files primarily define the structure (syntax) of a package's contents. To a limited extent they may also define some semantics, e.g. enumeration values for certain fields. Richer semantics, however, will need to be expressed as [validation code](docs/validations.md).

# Specification Versioning

Expand Down
88 changes: 88 additions & 0 deletions code/go/internal/validator/semantic/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,50 @@ func (r *runtimeField) UnmarshalYAML(value *yaml.Node) error {
return r.unmarshalString(value.Value)
}

type position struct {
file string
line int
column int
}

func (p position) String() string {
return fmt.Sprintf("%s:%d:%d", p.file, p.line, p.column)
}

type processor struct {
Type string
Attributes map[string]any
OnFailure []processor

position position
}

func (p *processor) UnmarshalYAML(value *yaml.Node) error {
var procMap map[string]struct {
Attributes map[string]any `yaml:",inline"`
OnFailure []processor `yaml:"on_failure"`
}
if err := value.Decode(&procMap); err != nil {
return err
}

for k, v := range procMap {
p.Type = k
p.Attributes = v.Attributes
p.OnFailure = v.OnFailure
break
}

p.position.line = value.Line
p.position.column = value.Column

return nil
}

type ingestPipeline struct {
Processors []processor `yaml:"processors"`
}

type field struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Expand All @@ -112,6 +156,12 @@ type fieldFileMetadata struct {
fullFilePath string
}

type pipelineFileMetadata struct {
dataStream string
filePath string
fullFilePath string
}

type validateFunc func(fileMetadata fieldFileMetadata, f field) specerrors.ValidationErrors

func validateFields(fsys fspath.FS, validate validateFunc) specerrors.ValidationErrors {
Expand Down Expand Up @@ -247,6 +297,23 @@ func readFieldsFolder(fsys fspath.FS, fieldsDir string) ([]string, error) {
return fieldsFiles, nil
}

func readPipelinesFolder(fsys fspath.FS, pipelinesDir string) ([]string, error) {
var pipelineFiles []string
entries, err := fs.ReadDir(fsys, pipelinesDir)
if errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}
if err != nil {
return nil, fmt.Errorf("can't list pipelines directory (path: %s): %w", fsys.Path(pipelinesDir), err)
}

for _, v := range entries {
pipelineFiles = append(pipelineFiles, path.Join(pipelinesDir, v.Name()))
}

return pipelineFiles, nil
}

func unmarshalFields(fsys fspath.FS, fieldsPath string) (fields, error) {
content, err := fs.ReadFile(fsys, fieldsPath)
if err != nil {
Expand Down Expand Up @@ -292,4 +359,25 @@ func listTransforms(fsys fspath.FS) ([]string, error) {
list[i] = transform.Name()
}
return list, nil

}

func listPipelineFiles(fsys fspath.FS, dataStream string) ([]pipelineFileMetadata, error) {
var pipelineFileMetadatas []pipelineFileMetadata

ingestPipelineDir := path.Join(dataStreamDir, dataStream, "elasticsearch", "ingest_pipeline")
pipelineFiles, err := readPipelinesFolder(fsys, ingestPipelineDir)
if err != nil {
return nil, fmt.Errorf("cannot read pipeline files from integration package: %w", err)
}

for _, file := range pipelineFiles {
pipelineFileMetadatas = append(pipelineFileMetadatas, pipelineFileMetadata{
filePath: file,
fullFilePath: fsys.Path(file),
dataStream: dataStream,
})
}

return pipelineFileMetadatas, nil
}
95 changes: 95 additions & 0 deletions code/go/internal/validator/semantic/validate_pipeline_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package semantic

import (
"fmt"
"io/fs"

"gopkg.in/yaml.v3"

"github.com/elastic/package-spec/v3/code/go/internal/fspath"
"github.com/elastic/package-spec/v3/code/go/pkg/specerrors"
)

// ValidatePipelineTags validates ingest pipeline processor tags.
func ValidatePipelineTags(fsys fspath.FS) specerrors.ValidationErrors {
dataStreams, err := listDataStreams(fsys)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

var errors specerrors.ValidationErrors
for _, dataStream := range dataStreams {
pipelineFiles, err := listPipelineFiles(fsys, dataStream)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

for _, pipelineFile := range pipelineFiles {
content, err := fs.ReadFile(fsys, pipelineFile.filePath)
if err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

var pipeline ingestPipeline
if err = yaml.Unmarshal(content, &pipeline); err != nil {
return specerrors.ValidationErrors{specerrors.NewStructuredError(err, specerrors.UnassignedCode)}
}

if vErrs := validatePipelineTags(&pipeline, pipelineFile.fullFilePath); len(vErrs) > 0 {
errors = append(errors, vErrs...)
}
}
}

return errors
}

func validatePipelineTags(pipeline *ingestPipeline, filename string) specerrors.ValidationErrors {
var errors specerrors.ValidationErrors

seen := map[string]struct{}{}
for _, proc := range pipeline.Processors {
procErrors := checkPipelineTag(&proc, seen, filename)
errors = append(errors, procErrors...)
}

return errors
}

func checkPipelineTag(proc *processor, seen map[string]struct{}, filename string) specerrors.ValidationErrors {
var errors specerrors.ValidationErrors

for _, subProc := range proc.OnFailure {
subErrors := checkPipelineTag(&subProc, seen, filename)
errors = append(errors, subErrors...)
}

raw, ok := proc.Attributes["tag"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking into the pipeline spec

i don't see there the property "tag" defined...

am i right assuming the definition should be here? if so, could we control the required validation version from the json patch rules?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I noticed that tag wasn't defined there as well. We're also missing all of the required fields for each processor (i.e., the 'rename' processor requires the 'field' property).

In this case, though, tag isn't a required field by elasticsearch, rather, we're requiring it on certain processors in certain situations (only in the processors list, but no the global on_failure handler). For that reason, I wouldn't want to apply the tag enforcement globally.

Copy link
Member

@jsoriano jsoriano Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we control the required validation version from the json patch rules?

Yes, you are right, it would be better to handle this from the json schema if possible. Though as implemented here it is easier to make it skippable, and to provide better error messages. If brought to the schema we'd need to add rules to code/go/internal/validator/spec.go. Also not sure about how to prevent duplications from the spec. So I am on the fence on this.

Maybe an option would be to use both, validate that there are tags in the spec, and check with a semantic rule that tags are not duplicated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not sure about how to prevent duplications from the spec.

Well, this seems to be easy too: https://json-schema.org/understanding-json-schema/reference/array#uniqueItems

@taylor-swanson wdyt about rewriting this with the spec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's certainly an interesting idea... How would that work though since tag is nested a bit deeper in the tree compared to the processor itself?

processors:
  - set:
      tag: tag_1
  - set:
      tag: tag_2

It ends of being array.object.object to get down to tag.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it would be complex to check unique items in different objects... I guess this part would still need to be in code.

if !ok {
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d missing required tag", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
return errors
}

tag, ok := raw.(string)
if !ok {
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has invalid tag value", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
return errors
}
if tag == "" {
errors = append(errors, specerrors.NewStructuredError(fmt.Errorf("file %q is invalid: %s processor at line %d has empty tag value", filename, proc.Type, proc.position.line), specerrors.CodePipelineTagRequired))
return errors
}

if _, dup := seen[tag]; dup {
errors = append(errors, specerrors.NewStructuredErrorf("file %q is invalid: %s processor at line %d has duplicate tag value: %q", filename, proc.Type, proc.position.line, tag))
return errors
}

seen[tag] = struct{}{}

return errors
}
150 changes: 150 additions & 0 deletions code/go/internal/validator/semantic/validate_pipeline_tags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package semantic

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)

func TestValidatePipelineTags(t *testing.T) {
testCases := []struct {
name string
pipeline string
errors []string
}{
{
name: "good",
pipeline: `
processors:
- set:
tag: set_1
field: key1
value: value1
- set:
tag: set_2
field: key2
value: value2
on_failure:
- set:
tag: onfail_1
field: fail_key_1
key: fail_value_1
`,
},
{
name: "missing-tag",
pipeline: `
processors:
- set:
field: key1
value: value1
on_failure:
- set:
tag: onfail_1
field: fail_key_1
key: fail_value_1
`,
errors: []string{
`file "default.yml" is invalid: set processor at line 3 missing required tag (SVR00006)`,
},
},
{
name: "missing-tag-nested",
pipeline: `
processors:
- set:
tag: set_1
field: key1
value: value1
on_failure:
- set:
field: fail_key_1
value: fail_value_1
`,
errors: []string{
`file "default.yml" is invalid: set processor at line 8 missing required tag (SVR00006)`,
},
},
{
name: "duplicate-tag",
pipeline: `
processors:
- set:
tag: set_1
field: key1
value: value1
- set:
tag: set_1
field: key2
value: value2
`,
errors: []string{
`file "default.yml" is invalid: set processor at line 7 has duplicate tag value: "set_1"`,
},
},
{
name: "duplicate-nested-tag",
pipeline: `
processors:
- set:
tag: set_1
field: key1
value: value1
on_failure:
- set:
tag: set_1
field: fail_key_1
value: fail_value_1
`,
errors: []string{
`file "default.yml" is invalid: set processor at line 3 has duplicate tag value: "set_1"`,
},
},
{
name: "invalid-tag-value",
pipeline: `
processors:
- set:
tag: 1
field: key1
value: value1
`,
errors: []string{
`file "default.yml" is invalid: set processor at line 3 has invalid tag value (SVR00006)`,
},
},
{
name: "empty-tag-value",
pipeline: `
processors:
- set:
tag: ''
field: key1
value: value1
`,
errors: []string{
`file "default.yml" is invalid: set processor at line 3 has empty tag value (SVR00006)`,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var pipeline ingestPipeline
err := yaml.Unmarshal([]byte(tc.pipeline), &pipeline)
require.NoError(t, err)

errors := validatePipelineTags(&pipeline, "default.yml")
assert.Len(t, errors, len(tc.errors))
for _, err := range errors {
assert.Contains(t, tc.errors, err.Error())
}
})
}
}
1 change: 1 addition & 0 deletions code/go/internal/validator/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (s Spec) rules(pkgType string, rootSpec spectypes.ItemSpec) validationRules
{fn: semantic.ValidateInputPackagesPolicyTemplates, types: []string{"input"}},
{fn: semantic.ValidateMinimumAgentVersion},
{fn: semantic.ValidateIntegrationPolicyTemplates, types: []string{"integration"}},
{fn: semantic.ValidatePipelineTags, types: []string{"integration"}, since: semver.MustParse("3.6.0")},
}

var validationRules validationRules
Expand Down
1 change: 1 addition & 0 deletions code/go/pkg/specerrors/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ const (
CodeKibanaDanglingObjectsIDs = "SVR00003"
CodeVisualizationByValue = "SVR00004"
CodeMinimumKibanaVersion = "SVR00005"
CodePipelineTagRequired = "SVR00006"
)
6 changes: 6 additions & 0 deletions code/go/pkg/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,12 @@ func TestValidateIngestPipelines(t *testing.T) {
"field processors.2.remove.if: rename \"message\" to \"event.original\" processor requires remove \"message\" processor with if: 'ctx.event?.original != null' (JSE00001)",
},
},
"bad_pipeline_tags": {
"example": []string{
"set processor at line 4 missing required tag (SVR00006)",
"set processor at line 15 has duplicate tag value: \"set_sample_field\"",
},
},
}

for pkgName, pipelines := range tests {
Expand Down
Loading