Skip to content

Commit 808efee

Browse files
authored
Remove reroute processors from coverage reports (#1647)
Remove the reroute processors from the pipeline stats to generate the coverage reports. And at the same time, to be able to use the right lines in the coverage reports, it is needed to keep the original contents (the ones read from the file) of the pipeline.
1 parent 8b4aaa0 commit 808efee

File tree

4 files changed

+37
-10
lines changed

4 files changed

+37
-10
lines changed

internal/elasticsearch/ingest/datastream.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,18 @@ func loadIngestPipelineFiles(dataStreamPath string, nonce int64) ([]Pipeline, er
9898
return nil, err
9999
}
100100

101-
c, err = addRerouteProcessors(c, dataStreamPath, path)
101+
cWithRerouteProcessors, err := addRerouteProcessors(c, dataStreamPath, path)
102102
if err != nil {
103103
return nil, err
104104
}
105105

106106
name := filepath.Base(path)
107107
pipelines = append(pipelines, Pipeline{
108-
Path: path,
109-
Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
110-
Format: filepath.Ext(path)[1:],
111-
Content: c,
108+
Path: path,
109+
Name: getPipelineNameWithNonce(name[:strings.Index(name, ".")], nonce),
110+
Format: filepath.Ext(path)[1:],
111+
Content: cWithRerouteProcessors,
112+
ContentOriginal: c,
112113
})
113114
}
114115
return pipelines, nil

internal/elasticsearch/ingest/pipeline.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@ type pipelineIngestedDocument struct {
3636

3737
// Pipeline represents a pipeline resource loaded from a file
3838
type Pipeline struct {
39-
Path string // Path of the file with the pipeline definition.
40-
Name string // Name of the pipeline.
41-
Format string // Format (extension) of the pipeline.
42-
Content []byte // Content is the original file contents.
39+
Path string // Path of the file with the pipeline definition.
40+
Name string // Name of the pipeline.
41+
Format string // Format (extension) of the pipeline.
42+
Content []byte // Content is the pipeline file contents with reroute processors if any.
43+
ContentOriginal []byte // Content is the original file contents.
4344
}
4445

4546
// Filename returns the original filename associated with the pipeline.

internal/elasticsearch/ingest/processors.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ func (p Pipeline) Processors() (procs []Processor, err error) {
3636
return procs, nil
3737
}
3838

39+
// OriginalProcessors return the original list of processors in an ingest pipeline.
40+
func (p Pipeline) OriginalProcessors() (procs []Processor, err error) {
41+
switch p.Format {
42+
case "yaml", "yml", "json":
43+
procs, err = processorsFromYAML(p.ContentOriginal)
44+
default:
45+
return nil, fmt.Errorf("unsupported pipeline format: %s", p.Format)
46+
}
47+
if err != nil {
48+
return nil, fmt.Errorf("failure processing %s pipeline '%s': %w", p.Format, p.Filename(), err)
49+
}
50+
return procs, nil
51+
}
52+
3953
// extract a list of processors from a pipeline definition in YAML format.
4054
func processorsFromYAML(content []byte) (procs []Processor, err error) {
4155
var p struct {

internal/testrunner/runners/pipeline/coverage.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func GetPipelineCoverage(options testrunner.TestOptions, pipelines []ingest.Pipe
105105

106106
func pipelineDataForCoverage(pipeline ingest.Pipeline, stats ingest.PipelineStatsMap, basePath, dataStreamPath string) (string, string, []ingest.Processor, ingest.PipelineStats, error) {
107107
// Load the list of main processors from the pipeline source code, annotated with line numbers.
108-
src, err := pipeline.Processors()
108+
src, err := pipeline.OriginalProcessors()
109109
if err != nil {
110110
return "", "", nil, ingest.PipelineStats{}, err
111111
}
@@ -115,6 +115,17 @@ func pipelineDataForCoverage(pipeline ingest.Pipeline, stats ingest.PipelineStat
115115
return "", "", nil, ingest.PipelineStats{}, fmt.Errorf("pipeline '%s' not installed in Elasticsearch", pipeline.Name)
116116
}
117117

118+
// Remove reroute processors if any so the pipeline has the same processors as in the file
119+
// reroute processors are added if there are any routing_rules file defined
120+
var processors []ingest.ProcessorStats
121+
for _, proc := range pstats.Processors {
122+
if proc.Type == "reroute" {
123+
continue
124+
}
125+
processors = append(processors, proc)
126+
}
127+
pstats.Processors = processors
128+
118129
// Ensure there is no inconsistency in the list of processors in stats vs obtained from source.
119130
if len(src) != len(pstats.Processors) {
120131
return "", "", nil, ingest.PipelineStats{}, fmt.Errorf("processor count mismatch for %s (src:%d stats:%d)", pipeline.Filename(), len(src), len(pstats.Processors))

0 commit comments

Comments
 (0)