1- // Copyright 2023 Blink Labs Software
1+ // Copyright 2025 Blink Labs Software
22//
33// Licensed under the Apache License, Version 2.0 (the "License");
44// you may not use this file except in compliance with the License.
1515package pipeline
1616
1717import (
18+ "errors"
1819 "fmt"
1920 "sync"
2021
@@ -31,6 +32,7 @@ type Pipeline struct {
3132 errorChan chan error
3233 doneChan chan bool
3334 wg sync.WaitGroup
35+ stopOnce sync.Once
3436}
3537
3638func New () * Pipeline {
@@ -55,44 +57,61 @@ func (p *Pipeline) AddOutput(output plugin.Plugin) {
5557 p .outputs = append (p .outputs , output )
5658}
5759
58- func (p * Pipeline ) ErrorChan () chan error {
60+ // ErrorChan is read-only
61+ func (p * Pipeline ) ErrorChan () <- chan error {
5962 return p .errorChan
6063}
6164
6265// Start initiates the configured plugins and starts the necessary background processes to run the pipeline
6366func (p * Pipeline ) Start () error {
67+ // Check if doneChan is already closed this happens if pipeline was stopped
68+ // A stopped pipeline cannot be restarted
69+ select {
70+ case <- p .doneChan :
71+ return errors .New ("cannot start a stopped pipeline" )
72+ default :
73+ // continue
74+ }
75+
6476 // Start inputs
6577 for _ , input := range p .inputs {
6678 if err := input .Start (); err != nil {
6779 return fmt .Errorf ("failed to start input: %w" , err )
6880 }
6981 // Start background process to send input events to combined filter channel
82+ p .wg .Add (1 )
7083 go p .chanCopyLoop (input .OutputChan (), p .filterChan )
7184 // Start background error listener
85+ p .wg .Add (1 )
7286 go p .errorChanWait (input .ErrorChan ())
7387 }
7488 // Start filters
7589 for idx , filter := range p .filters {
7690 if err := filter .Start (); err != nil {
77- return fmt .Errorf ("failed to start input : %w" , err )
91+ return fmt .Errorf ("failed to start filter : %w" , err )
7892 }
7993 if idx == 0 {
8094 // Start background process to send events from combined filter channel to first filter plugin
95+ p .wg .Add (1 )
8196 go p .chanCopyLoop (p .filterChan , filter .InputChan ())
8297 } else {
8398 // Start background process to send events from previous filter plugin to current filter plugin
99+ p .wg .Add (1 )
84100 go p .chanCopyLoop (p .filters [idx - 1 ].OutputChan (), filter .InputChan ())
85101 }
86102 if idx == len (p .filters )- 1 {
87103 // Start background process to send events from last filter to combined output channel
104+ p .wg .Add (1 )
88105 go p .chanCopyLoop (filter .OutputChan (), p .outputChan )
89106 }
90107 // Start background error listener
108+ p .wg .Add (1 )
91109 go p .errorChanWait (filter .ErrorChan ())
92110 }
93111 if len (p .filters ) == 0 {
94112 // Start background process to send events from combined filter channel to combined output channel if
95113 // there are no filter plugins
114+ p .wg .Add (1 )
96115 go p .chanCopyLoop (p .filterChan , p .outputChan )
97116 }
98117 // Start outputs
@@ -101,78 +120,114 @@ func (p *Pipeline) Start() error {
101120 return fmt .Errorf ("failed to start output: %w" , err )
102121 }
103122 // Start background error listener
123+ p .wg .Add (1 )
104124 go p .errorChanWait (output .ErrorChan ())
105125 }
126+ p .wg .Add (1 )
106127 go p .outputChanLoop ()
107128 return nil
108129}
109130
110131// Stop shuts down the pipeline and all plugins
132+ // Stop is idempotent and safe to call multiple times
133+ // A stopped pipeline cannot be restarted
111134func (p * Pipeline ) Stop () error {
112- close (p .doneChan )
113- p .wg .Wait ()
114- close (p .errorChan )
115- close (p .filterChan )
116- close (p .outputChan )
117- // Stop inputs
118- for _ , input := range p .inputs {
119- if err := input .Stop (); err != nil {
120- return fmt .Errorf ("failed to stop input: %w" , err )
135+ var stopErrors []error
136+
137+ p .stopOnce .Do (func () {
138+ close (p .doneChan )
139+ p .wg .Wait ()
140+
141+ // Stop plugins and collect errors
142+ for _ , input := range p .inputs {
143+ if err := input .Stop (); err != nil {
144+ stopErrors = append (stopErrors , fmt .Errorf ("failed to stop input: %w" , err ))
145+ }
121146 }
122- }
123- // Stop outputs
124- for _ , output := range p .outputs {
125- if err := output .Stop (); err != nil {
126- return fmt .Errorf ("failed to stop output: %w" , err )
147+ for _ , filter := range p .filters {
148+ if err := filter .Stop (); err != nil {
149+ stopErrors = append (stopErrors , fmt .Errorf ("failed to stop filter: %w" , err ))
150+ }
127151 }
128- }
129- return nil
152+ for _ , output := range p .outputs {
153+ if err := output .Stop (); err != nil {
154+ stopErrors = append (stopErrors , fmt .Errorf ("failed to stop output: %w" , err ))
155+ }
156+ }
157+
158+ close (p .errorChan )
159+ close (p .filterChan )
160+ close (p .outputChan )
161+ })
162+
163+ return errors .Join (stopErrors ... )
130164}
131165
132166// chanCopyLoop is a generic function for reading an event from one channel and writing it to another in a loop
133167func (p * Pipeline ) chanCopyLoop (
134168 input <- chan event.Event ,
135169 output chan <- event.Event ,
136170) {
137- p .wg .Add ( 1 )
171+ defer p .wg .Done ( )
138172 for {
139173 select {
140174 case <- p .doneChan :
141- p .wg .Done ()
142175 return
143176 case evt , ok := <- input :
144- if ok {
145- // Copy input event to output chan
146- output <- evt
177+ if ! ok {
178+ return
179+ }
180+ select {
181+ // Pass input event to output chan
182+ case output <- evt :
183+ case <- p .doneChan :
184+ return
147185 }
148186 }
149187 }
150188}
151189
152190// outputChanLoop reads events from the output channel and writes them to each output plugin's input channel
153191func (p * Pipeline ) outputChanLoop () {
154- p .wg .Add ( 1 )
192+ defer p .wg .Done ( )
155193 for {
156194 select {
157195 case <- p .doneChan :
158- p .wg .Done ()
159196 return
160197 case evt , ok := <- p .outputChan :
161- if ok {
162- // Send event to all output plugins
163- for _ , output := range p .outputs {
164- output .InputChan () <- evt
198+ if ! ok {
199+ return
200+ }
201+ // Send event to all output plugins
202+ for _ , output := range p .outputs {
203+ select {
204+ case output .InputChan () <- evt :
205+ case <- p .doneChan :
206+ return
165207 }
166208 }
167209 }
168210 }
169211}
170212
171- // errorChanWait reads from an error channel. If an error is received, it's copied to the plugin error channel and the plugin stopped
213+ // errorChanWait reads from an error channel. If an error is received, it's copied to the plugin error channel
172214func (p * Pipeline ) errorChanWait (errorChan chan error ) {
173- err , ok := <- errorChan
174- if ok {
175- p .errorChan <- err
176- _ = p .Stop ()
215+ defer p .wg .Done ()
216+ for {
217+ select {
218+ case <- p .doneChan :
219+ return
220+ case err , ok := <- errorChan :
221+ if ! ok {
222+ // Channel closed
223+ return
224+ }
225+ // Forward plugin error to pipeline error channel
226+ select {
227+ case p .errorChan <- err :
228+ case <- p .doneChan :
229+ return
230+ }
231+ }
177232 }
178233}
0 commit comments