Skip to content

Commit 63558c4

Browse files
authored
Fail on wrong pipeline config with duplicated stage names (#1090)
Fixes #333
1 parent b855e8e commit 63558c4

File tree

2 files changed

+38
-0
lines changed

2 files changed

+38
-0
lines changed

pkg/pipeline/pipeline_builder.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ func (b *builder) presetIngester(ing ingest.Ingester) {
161161
func (b *builder) readStages() error {
162162
for _, param := range b.configParams {
163163
log.Debugf("stage = %v", param.Name)
164+
if _, exists := b.pipelineEntryMap[param.Name]; exists {
165+
return fmt.Errorf("duplicate stage name '%s' found in pipeline definition", param.Name)
166+
}
164167
pEntry := pipelineEntry{
165168
stageName: param.Name,
166169
stageType: findStageType(&param),

pkg/pipeline/pipeline_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,38 @@ func BenchmarkPipeline(b *testing.B) {
283283
p.Run()
284284
}
285285
}
286+
287+
const invalidConfig = `---
288+
log-level: debug
289+
pipeline:
290+
- name: ingest
291+
- name: test
292+
follows: ingest
293+
- name: test
294+
follows: test
295+
parameters:
296+
- name: ingest
297+
ingest:
298+
type: file
299+
file:
300+
filename: ../../hack/examples/ocp-ipfix-flowlogs.json
301+
decoder:
302+
type: json
303+
- name: test
304+
transform:
305+
type: generic
306+
generic: {}
307+
- name: test
308+
write:
309+
type: none
310+
`
311+
312+
func Test_InvalidPipeline_DuplicatedNames(t *testing.T) {
313+
var mainPipeline *Pipeline
314+
var err error
315+
_, cfg := test.InitConfig(t, invalidConfig)
316+
317+
mainPipeline, err = NewPipeline(cfg)
318+
assert.ErrorContains(t, err, "duplicate stage name 'test' found in pipeline definition")
319+
assert.Nil(t, mainPipeline)
320+
}

0 commit comments

Comments
 (0)